From 127d5bb235fe5489444e24dfff2a68b76b83c5c2 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 4 Aug 2021 11:31:20 +0800 Subject: [PATCH] dtmcli standalone --- common/utils.go | 23 +++++----- common/utils_test.go | 76 +------------------------------ dtmsvr/api.go | 2 +- dtmsvr/config.go | 7 ++- dtmsvr/cron.go | 6 +-- dtmsvr/dtmsvr_test.go | 36 +++++++-------- dtmsvr/main.go | 7 +-- dtmsvr/trans.go | 26 +++++------ dtmsvr/trans_msg.go | 7 +-- dtmsvr/trans_saga.go | 5 +- dtmsvr/trans_saga_barrier_test.go | 5 +- dtmsvr/trans_tcc.go | 7 +-- dtmsvr/trans_tcc_barrier_test.go | 14 +++--- dtmsvr/trans_xa.go | 9 ++-- dtmsvr/utils.go | 6 +-- examples/config.go | 7 ++- examples/data.go | 8 ++-- examples/main_base.go | 13 +++--- examples/main_msg.go | 5 +- examples/main_saga.go | 7 ++- examples/main_saga_barrier.go | 8 ++-- examples/main_tcc.go | 4 +- examples/main_tcc_barrier.go | 12 ++--- examples/main_xa.go | 4 +- examples/quick_start.go | 4 +- examples/types.go | 8 ++-- 26 files changed, 126 insertions(+), 190 deletions(-) diff --git a/common/utils.go b/common/utils.go index 8d70903..4b00014 100644 --- a/common/utils.go +++ b/common/utils.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" + "github.com/yedf/dtm/dtmcli" yaml "gopkg.in/yaml.v2" ) @@ -19,19 +20,19 @@ func GetGinApp() *gin.Engine { body := "" if c.Request.Body != nil { rb, err := c.GetRawData() - E2P(err) + dtmcli.E2P(err) if len(rb) > 0 { body = string(rb) c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) } } began := time.Now() - Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + dtmcli.Logf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) c.Next() - Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + dtmcli.Logf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) }) - app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, M{"msg": "pong"}) }) + app.Any("/api/ping", func(c *gin.Context) { c.JSON(200, dtmcli.M{"msg": "pong"}) }) return app } @@ -46,14 +47,14 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc { b, err = json.Marshal(r) } if err != nil { - Logf("status: 500, code: 500 message: %s", err.Error()) - c.JSON(500, M{"code": 500, "message": err.Error()}) + dtmcli.Logf("status: 500, code: 500 message: %s", err.Error()) + c.JSON(500, dtmcli.M{"code": 500, "message": err.Error()}) } else { - Logf("status: 200, content: %s", string(b)) + dtmcli.Logf("status: 200, content: %s", string(b)) c.Status(200) c.Writer.Header().Add("Content-Type", "application/json") _, err = c.Writer.Write(b) - E2P(err) + dtmcli.E2P(err) } } } @@ -64,8 +65,8 @@ func InitConfig(dir string, config interface{}) { if err != nil { cont, err = ioutil.ReadFile(dir + "/conf.sample.yml") } - Logf("cont is: \n%s", string(cont)) - E2P(err) + dtmcli.Logf("cont is: \n%s", string(cont)) + dtmcli.E2P(err) err = yaml.Unmarshal(cont, config) - E2P(err) + dtmcli.E2P(err) } diff --git a/common/utils_test.go b/common/utils_test.go index f50e614..ff9b6ed 100644 --- a/common/utils_test.go +++ b/common/utils_test.go @@ -5,62 +5,13 @@ import ( "io" "net/http" "net/http/httptest" - "os" "strings" "testing" "github.com/gin-gonic/gin" - "github.com/stretchr/testify/assert" + "github.com/go-playground/assert/v2" ) -func TestEP(t *testing.T) { - skipped := true - err := func() (rerr error) { - defer P2E(&rerr) - E2P(errors.New("err1")) - skipped = false - return nil - }() - assert.Equal(t, true, skipped) - assert.Equal(t, "err1", err.Error()) - err = CatchP(func() { - PanicIf(true, errors.New("err2")) - }) - assert.Equal(t, "err2", err.Error()) - err = func() (rerr error) { - defer func() { - x := recover() - assert.Equal(t, 1, x) - }() - defer P2E(&rerr) - panic(1) - }() -} - -func TestTernary(t *testing.T) { - assert.Equal(t, "1", OrString("", "", "1")) - assert.Equal(t, "", OrString("", "", "")) - assert.Equal(t, "1", If(true, "1", "2")) - assert.Equal(t, "2", If(false, "1", "2")) -} - -func TestMarshal(t *testing.T) { - a := 0 - type e struct { - A int - } - e1 := e{A: 10} - m := map[string]int{} - assert.Equal(t, "1", MustMarshalString(1)) - assert.Equal(t, []byte("1"), MustMarshal(1)) - MustUnmarshal([]byte("2"), &a) - assert.Equal(t, 2, a) - MustUnmarshalString("3", &a) - assert.Equal(t, 3, a) - MustRemarshal(&e1, &m) - assert.Equal(t, 10, m["A"]) -} - func TestGin(t *testing.T) { app := GetGinApp() app.GET("/api/sample", WrapHandler(func(c *gin.Context) (interface{}, error) { @@ -79,28 +30,3 @@ func TestGin(t *testing.T) { assert.Equal(t, "1", getResultString("/api/sample", nil)) assert.Equal(t, "{\"code\":500,\"message\":\"err1\"}", getResultString("/api/error", strings.NewReader("{}"))) } - -func TestSome(t *testing.T) { - n := MustAtoi("123") - assert.Equal(t, 123, n) - - err := CatchP(func() { - MustAtoi("abc") - }) - assert.Error(t, err) - wd := MustGetwd() - assert.NotEqual(t, "", wd) - - dir1 := GetCurrentCodeDir() - assert.Equal(t, true, strings.HasSuffix(dir1, "common")) - - func1 := GetFuncName() - assert.Equal(t, true, strings.HasSuffix(func1, "TestSome")) - - os.Setenv("IS_DOCKER_COMPOSE", "1") - s := MayReplaceLocalhost("http://localhost") - assert.Equal(t, "http://host.docker.internal", s) - os.Setenv("IS_DOCKER_COMPOSE", "") - s2 := MayReplaceLocalhost("http://localhost") - assert.Equal(t, "http://localhost", s2) -} diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 7c9af86..11d06ca 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -76,7 +76,7 @@ func registerXaBranch(c *gin.Context) (interface{}, error) { } func registerTccBranch(c *gin.Context) (interface{}, error) { - data := common.MS{} + data := dtmcli.MS{} err := c.BindJSON(&data) e2p(err) branch := TransBranch{ diff --git a/dtmsvr/config.go b/dtmsvr/config.go index a078691..258ff5a 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -1,6 +1,9 @@ package dtmsvr -import "github.com/yedf/dtm/common" +import ( + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" +) type dtmsvrConfig struct { TransCronInterval int64 `yaml:"TransCronInterval"` // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和committed的任务 @@ -14,6 +17,6 @@ var config = &dtmsvrConfig{ var dbName = "dtm" func init() { - common.InitConfig(common.GetProjectDir(), &config) + common.InitConfig(dtmcli.GetProjectDir(), &config) config.DB["database"] = "" } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 1f6ec51..2ff1f1d 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -7,7 +7,7 @@ import ( "runtime/debug" "time" - "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) // CronTransOnce cron expired trans. use expireIn as expire time @@ -54,7 +54,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { func handlePanic(perr *error) { if err := recover(); err != nil { - common.LogRedf("----panic %v handlered\n%s", err, string(debug.Stack())) + dtmcli.LogRedf("----panic %v handlered\n%s", err, string(debug.Stack())) if perr != nil { *perr = fmt.Errorf("dtm panic: %v", err) } @@ -64,6 +64,6 @@ func handlePanic(perr *error) { func sleepCronTime() { delta := math.Min(3, float64(config.TransCronInterval)) interval := time.Duration((float64(config.TransCronInterval) - rand.Float64()*delta) * float64(time.Second)) - common.Logf("sleeping for %v", interval) + dtmcli.Logf("sleeping for %v", interval) time.Sleep(interval) } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index a4bd8d0..1cfa263 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -42,7 +42,7 @@ func resetXaData() { func TestMain(m *testing.M) { TransProcessedTestChan = make(chan string, 1) - common.InitConfig(common.GetProjectDir(), &config) + common.InitConfig(dtmcli.GetProjectDir(), &config) PopulateDB(false) examples.PopulateDB(false) // 启动组件 @@ -63,7 +63,7 @@ func TestCover(t *testing.T) { db := dbGet() db.NoMust() CronTransOnce(0) - err := common.CatchP(func() { + err := dtmcli.CatchP(func() { checkAffected(db.DB) }) assert.Error(t, err) @@ -73,16 +73,16 @@ func TestCover(t *testing.T) { } func TestType(t *testing.T) { - err := common.CatchP(func() { + err := dtmcli.CatchP(func() { dtmcli.MustGenGid("http://localhost:8080/api/no") }) assert.Error(t, err) - err = common.CatchP(func() { - resp, err := common.RestyClient.R().SetBody(common.M{ + err = dtmcli.CatchP(func() { + resp, err := dtmcli.RestyClient.R().SetBody(dtmcli.M{ "gid": "1", "trans_type": "msg", }).Get("http://localhost:8080/api/dtmsvr/abort") - common.CheckRestySuccess(resp, err) + dtmcli.CheckRestySuccess(resp, err) }) assert.Error(t, err) } @@ -111,7 +111,7 @@ func assertSucceed(t *testing.T, gid string) { } func genMsg(gid string) *dtmcli.Msg { - common.Logf("beginning a msg test ---------------- %s", gid) + dtmcli.Logf("beginning a msg test ---------------- %s", gid) msg := dtmcli.NewMsg(examples.DtmServer, gid) msg.QueryPrepared = examples.Busi + "/CanSubmit" req := examples.GenTransReq(30, false, false) @@ -121,7 +121,7 @@ func genMsg(gid string) *dtmcli.Msg { } func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { - common.Logf("beginning a saga test ---------------- %s", gid) + dtmcli.Logf("beginning a saga test ---------------- %s", gid) saga := dtmcli.NewSaga(examples.DtmServer, gid) req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) @@ -130,22 +130,22 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { } func transQuery(t *testing.T, gid string) { - resp, err := common.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query") + resp, err := dtmcli.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query") e2p(err) m := M{} assert.Equal(t, resp.StatusCode(), 200) - common.MustUnmarshalString(resp.String(), &m) + dtmcli.MustUnmarshalString(resp.String(), &m) assert.NotEqual(t, nil, m["transaction"]) assert.Equal(t, 4, len(m["branches"].([]interface{}))) - resp, err = common.RestyClient.R().SetQueryParam("gid", "").Get(examples.DtmServer + "/query") + resp, err = dtmcli.RestyClient.R().SetQueryParam("gid", "").Get(examples.DtmServer + "/query") e2p(err) assert.Equal(t, resp.StatusCode(), 500) - resp, err = common.RestyClient.R().SetQueryParam("gid", "1").Get(examples.DtmServer + "/query") + resp, err = dtmcli.RestyClient.R().SetQueryParam("gid", "1").Get(examples.DtmServer + "/query") e2p(err) assert.Equal(t, resp.StatusCode(), 200) - common.MustUnmarshalString(resp.String(), &m) + dtmcli.MustUnmarshalString(resp.String(), &m) assert.Equal(t, nil, m["transaction"]) assert.Equal(t, 0, len(m["branches"].([]interface{}))) } @@ -161,7 +161,7 @@ func TestSqlDB(t *testing.T) { } db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')") _, err := dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.Tx) (interface{}, error) { - common.Logf("rollback gid2") + dtmcli.Logf("rollback gid2") return nil, fmt.Errorf("gid2 error") }) asserts.Error(err, fmt.Errorf("gid2 error")) @@ -169,17 +169,17 @@ func TestSqlDB(t *testing.T) { asserts.Equal(dbr.RowsAffected, int64(1)) dbr = db.Model(&BarrierModel{}).Where("gid=?", "gid2").Find(&[]BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(0)) - gid2Res := common.M{"result": "first"} + gid2Res := dtmcli.M{"result": "first"} _, err = dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.Tx) (interface{}, error) { - common.Logf("submit gid2") + dtmcli.Logf("submit gid2") return gid2Res, nil }) asserts.Nil(err) dbr = db.Model(&BarrierModel{}).Where("gid=?", "gid2").Find(&[]BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(1)) newResult, err := dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.Tx) (interface{}, error) { - common.Logf("submit gid2") - return common.MS{"result": "ignored"}, nil + dtmcli.Logf("submit gid2") + return dtmcli.MS{"result": "ignored"}, nil }) asserts.Equal(newResult, gid2Res) } diff --git a/dtmsvr/main.go b/dtmsvr/main.go index 2b8ccf0..a3b24c7 100644 --- a/dtmsvr/main.go +++ b/dtmsvr/main.go @@ -5,6 +5,7 @@ import ( "time" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -12,16 +13,16 @@ var dtmsvrPort = 8080 // StartSvr StartSvr func StartSvr() { - common.Logf("start dtmsvr") + dtmcli.Logf("start dtmsvr") app := common.GetGinApp() addRoute(app) - common.Logf("dtmsvr listen at: %d", dtmsvrPort) + dtmcli.Logf("dtmsvr listen at: %d", dtmsvrPort) go app.Run(fmt.Sprintf(":%d", dtmsvrPort)) time.Sleep(100 * time.Millisecond) } // PopulateDB setup mysql data func PopulateDB(skipDrop bool) { - file := fmt.Sprintf("%s/dtmsvr.%s.sql", common.GetCurrentCodeDir(), config.DB["driver"]) + file := fmt.Sprintf("%s/dtmsvr.%s.sql", dtmcli.GetCurrentCodeDir(), config.DB["driver"]) examples.RunSQLScript(config.DB, file, skipDrop) } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index ccc03e9..6ef4f42 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -110,7 +110,7 @@ func (t *TransGlobal) getProcessor() transProcessor { } // Process process global transaction once -func (t *TransGlobal) Process(db *common.DB, waitResult bool) common.M { +func (t *TransGlobal) Process(db *common.DB, waitResult bool) dtmcli.M { if !waitResult { go t.processInner(db) return dtmcli.ResultSuccess @@ -118,10 +118,10 @@ func (t *TransGlobal) Process(db *common.DB, waitResult bool) common.M { submitting := t.Status == "submitted" err := t.processInner(db) if err != nil { - return common.M{"dtm_result": "FAILURE", "message": err.Error()} + return dtmcli.M{"dtm_result": "FAILURE", "message": err.Error()} } if submitting && t.Status != "succeed" { - return common.M{"dtm_result": "FAILURE", "message": "trans failed by user"} + return dtmcli.M{"dtm_result": "FAILURE", "message": "trans failed by user"} } return dtmcli.ResultSuccess } @@ -130,12 +130,12 @@ func (t *TransGlobal) processInner(db *common.DB) (rerr error) { defer handlePanic(&rerr) defer func() { if TransProcessedTestChan != nil { - common.Logf("processed: %s", t.Gid) + dtmcli.Logf("processed: %s", t.Gid) TransProcessedTestChan <- t.Gid - common.Logf("notified: %s", t.Gid) + dtmcli.Logf("notified: %s", t.Gid) } }() - common.Logf("processing: %s status: %s", t.Gid, t.Status) + dtmcli.Logf("processing: %s status: %s", t.Gid, t.Status) if t.Status == "prepared" && t.TransType != "msg" { t.changeStatus(db, "aborting") } @@ -145,8 +145,8 @@ func (t *TransGlobal) processInner(db *common.DB) (rerr error) { return } -func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS { - return common.MS{ +func (t *TransGlobal) getBranchParams(branch *TransBranch) dtmcli.MS { + return dtmcli.MS{ "gid": t.Gid, "trans_type": t.TransType, "branch_id": branch.BranchID, @@ -175,7 +175,7 @@ func (t *TransGlobal) saveNew(db *common.DB) { if dbr.RowsAffected > 0 { // 如果这个是新事务,保存所有的分支 branches := t.getProcessor().GenBranches() if len(branches) > 0 { - writeTransLog(t.Gid, "save branches", t.Status, "", common.MustMarshalString(branches)) + writeTransLog(t.Gid, "save branches", t.Status, "", dtmcli.MustMarshalString(branches)) db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(&branches) @@ -193,13 +193,13 @@ func TransFromContext(c *gin.Context) *TransGlobal { data := M{} b, err := c.GetRawData() e2p(err) - common.MustUnmarshal(b, &data) - common.Logf("creating trans in prepare") + dtmcli.MustUnmarshal(b, &data) + dtmcli.Logf("creating trans in prepare") if data["steps"] != nil { - data["data"] = common.MustMarshalString(data["steps"]) + data["data"] = dtmcli.MustMarshalString(data["steps"]) } m := TransGlobal{} - common.MustRemarshal(data, &m) + dtmcli.MustRemarshal(data, &m) return &m } diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index 4dca1bb..98c8823 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) type transMsgProcessor struct { @@ -18,7 +19,7 @@ func init() { func (t *transMsgProcessor) GenBranches() []TransBranch { branches := []TransBranch{} steps := []M{} - common.MustUnmarshalString(t.Data, &steps) + dtmcli.MustUnmarshalString(t.Data, &steps) for _, step := range steps { branches = append(branches, TransBranch{ Gid: t.Gid, @@ -33,7 +34,7 @@ func (t *transMsgProcessor) GenBranches() []TransBranch { } func (t *transMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) + resp, err := dtmcli.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { @@ -48,7 +49,7 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) { if t.Status != "prepared" { return } - resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) + resp, err := dtmcli.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 1d8c4eb..8b90d37 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) type transSagaProcessor struct { @@ -18,7 +19,7 @@ func init() { func (t *transSagaProcessor) GenBranches() []TransBranch { branches := []TransBranch{} steps := []M{} - common.MustUnmarshalString(t.Data, &steps) + dtmcli.MustUnmarshalString(t.Data, &steps) for i, step := range steps { branch := fmt.Sprintf("%02d", i+1) for _, branchType := range []string{"compensate", "action"} { @@ -36,7 +37,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { } func (t *transSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) + resp, err := dtmcli.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { diff --git a/dtmsvr/trans_saga_barrier_test.go b/dtmsvr/trans_saga_barrier_test.go index 52c189d..d45a9d2 100644 --- a/dtmsvr/trans_saga_barrier_test.go +++ b/dtmsvr/trans_saga_barrier_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -20,7 +19,7 @@ func sagaBarrierNormal(t *testing.T) { saga := dtmcli.NewSaga(DtmServer, "sagaBarrierNormal"). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - common.Logf("busi trans submit") + dtmcli.Logf("busi trans submit") err := saga.Submit() e2p(err) WaitTransProcessed(saga.Gid) @@ -31,7 +30,7 @@ func sagaBarrierRollback(t *testing.T) { saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback"). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) - common.Logf("busi trans submit") + dtmcli.Logf("busi trans submit") err := saga.Submit() e2p(err) WaitTransProcessed(saga.Gid) diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 8a53fb6..32f305e 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) type transTccProcessor struct { @@ -20,7 +21,7 @@ func (t *transTccProcessor) GenBranches() []TransBranch { } func (t *transTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := common.RestyClient.R().SetBody(branch.Data).SetHeader("Content-type", "application/json").SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) + resp, err := dtmcli.RestyClient.R().SetBody(branch.Data).SetHeader("Content-type", "application/json").SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { @@ -35,11 +36,11 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { if t.Status == "succeed" || t.Status == "failed" { return } - branchType := common.If(t.Status == "submitted", "confirm", "cancel").(string) + branchType := dtmcli.If(t.Status == "submitted", "confirm", "cancel").(string) for current := len(branches) - 1; current >= 0; current-- { if branches[current].BranchType == branchType && branches[current].Status == "prepared" { t.ExecBranch(db, &branches[current]) } } - t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string)) + t.changeStatus(db, dtmcli.If(t.Status == "submitted", "succeed", "failed").(string)) } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index b0e4620..dae0858 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -61,7 +61,7 @@ func tccBarrierDisorder(t *testing.T) { res, err := examples.TccBarrierTransOutCancel(c) if !sleeped { sleeped = true - common.Logf("sleep before cancel return") + dtmcli.Logf("sleep before cancel return") <-timeoutChan finishedChan <- "1" } @@ -73,18 +73,18 @@ func tccBarrierDisorder(t *testing.T) { "branch_id": branchID, "trans_type": "tcc", "status": "prepared", - "data": string(common.MustMarshal(body)), + "data": string(dtmcli.MustMarshal(body)), "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, }, "registerTccBranch") assert.Nil(t, err) go func() { - common.Logf("sleeping to wait for tcc try timeout") + dtmcli.Logf("sleeping to wait for tcc try timeout") <-timeoutChan - r, _ := common.RestyClient.R(). + r, _ := dtmcli.RestyClient.R(). SetBody(body). - SetQueryParams(common.MS{ + SetQueryParams(dtmcli.MS{ "dtm": tcc.Dtm, "gid": tcc.Gid, "branch_id": branchID, @@ -95,10 +95,10 @@ func tccBarrierDisorder(t *testing.T) { assert.True(t, strings.Contains(r.String(), "FAILURE")) finishedChan <- "1" }() - common.Logf("cron to timeout and then call cancel") + dtmcli.Logf("cron to timeout and then call cancel") go CronTransOnce(60 * time.Second) time.Sleep(100 * time.Millisecond) - common.Logf("cron to timeout and then call cancelled twice") + dtmcli.Logf("cron to timeout and then call cancelled twice") CronTransOnce(60 * time.Second) timeoutChan <- "wake" timeoutChan <- "wake" diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 819418c..ae0eea3 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) type transXaProcessor struct { @@ -19,9 +20,9 @@ func (t *transXaProcessor) GenBranches() []TransBranch { return []TransBranch{} } func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := common.RestyClient.R().SetQueryParams(common.MS{ + resp, err := dtmcli.RestyClient.R().SetQueryParams(dtmcli.MS{ "branch_id": branch.BranchID, - "action": common.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string), + "action": dtmcli.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string), "gid": branch.Gid, }).Post(branch.URL) e2p(err) @@ -38,11 +39,11 @@ func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { if t.Status == "succeed" { return } - currentType := common.If(t.Status == "submitted", "commit", "rollback").(string) + currentType := dtmcli.If(t.Status == "submitted", "commit", "rollback").(string) for _, branch := range branches { if branch.BranchType == currentType && branch.Status != "succeed" { t.ExecBranch(db, &branch) } } - t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string)) + t.changeStatus(db, dtmcli.If(t.Status == "submitted", "succeed", "failed").(string)) } diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index 94838d0..782fa18 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -14,8 +14,8 @@ import ( // M a short name type M = map[string]interface{} -var p2e = common.P2E -var e2p = common.E2P +var p2e = dtmcli.P2E +var e2p = dtmcli.E2P func dbGet() *common.DB { return common.DbGet(config.DB) @@ -69,7 +69,7 @@ func getOneHexIP() string { ns := strings.Split(ip, ".") r := []byte{} for _, n := range ns { - r = append(r, byte(common.MustAtoi(n))) + r = append(r, byte(dtmcli.MustAtoi(n))) } return hex.EncodeToString(r) } diff --git a/examples/config.go b/examples/config.go index cba0051..6368939 100644 --- a/examples/config.go +++ b/examples/config.go @@ -1,6 +1,9 @@ package examples -import "github.com/yedf/dtm/common" +import ( + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" +) type exampleConfig struct { DB map[string]string `yaml:"DB"` @@ -9,5 +12,5 @@ type exampleConfig struct { var config = exampleConfig{} func init() { - common.InitConfig(common.GetProjectDir(), &config) + common.InitConfig(dtmcli.GetProjectDir(), &config) } diff --git a/examples/data.go b/examples/data.go index 4d1b6bb..468a479 100644 --- a/examples/data.go +++ b/examples/data.go @@ -5,12 +5,12 @@ import ( "io/ioutil" "strings" - "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) // RunSQLScript 1 func RunSQLScript(conf map[string]string, script string, skipDrop bool) { - con := common.SdbAlone(conf) + con := dtmcli.SdbAlone(conf) defer func() { con.Close() }() content, err := ioutil.ReadFile(script) e2p(err) @@ -20,13 +20,13 @@ func RunSQLScript(conf map[string]string, script string, skipDrop bool) { if s == "" || skipDrop && strings.Contains(s, "drop") { continue } - _, err = common.SdbExec(con, s) + _, err = dtmcli.SdbExec(con, s) e2p(err) } } // PopulateDB populate example mysql data func PopulateDB(skipDrop bool) { - file := fmt.Sprintf("%s/examples.%s.sql", common.GetCurrentCodeDir(), config.DB["driver"]) + file := fmt.Sprintf("%s/examples.%s.sql", dtmcli.GetCurrentCodeDir(), config.DB["driver"]) RunSQLScript(config.DB, file, skipDrop) } diff --git a/examples/main_base.go b/examples/main_base.go index 310e5bc..1d63caf 100644 --- a/examples/main_base.go +++ b/examples/main_base.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) const ( @@ -20,10 +21,10 @@ var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) // BaseAppStartup base app startup func BaseAppStartup() *gin.Engine { - common.Logf("examples starting") + dtmcli.Logf("examples starting") app := common.GetGinApp() BaseAddRoute(app) - common.Logf("Starting busi at: %d", BusiPort) + dtmcli.Logf("Starting busi at: %d", BusiPort) go app.Run(fmt.Sprintf(":%d", BusiPort)) time.Sleep(100 * time.Millisecond) return app @@ -61,8 +62,8 @@ var MainSwitch mainSwitchType func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) { info := infoFromContext(c) - res := common.OrString(result1, result2, "SUCCESS") - common.Logf("%s %s result: %s", busi, info.String(), res) + res := dtmcli.OrString(result1, result2, "SUCCESS") + dtmcli.Logf("%s %s result: %s", busi, info.String(), res) return M{"dtm_result": res}, nil } @@ -88,7 +89,7 @@ func BaseAddRoute(app *gin.Engine) { return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "TransOutRevert") })) app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - common.Logf("%s CanSubmit", c.Query("gid")) - return common.OrString(MainSwitch.CanSubmitResult.Fetch(), "SUCCESS"), nil + dtmcli.Logf("%s CanSubmit", c.Query("gid")) + return dtmcli.OrString(MainSwitch.CanSubmitResult.Fetch(), "SUCCESS"), nil })) } diff --git a/examples/main_msg.go b/examples/main_msg.go index 19bb057..c57ad54 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -2,7 +2,6 @@ package examples import ( "github.com/gin-gonic/gin" - "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" ) @@ -12,14 +11,14 @@ func MsgSetup(app *gin.Engine) { // MsgFireRequest 1 func MsgFireRequest() string { - common.Logf("a busi transaction begin") + dtmcli.Logf("a busi transaction begin") req := &TransReq{Amount: 30} msg := dtmcli.NewMsg(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/TransQuery") e2p(err) - common.Logf("busi trans submit") + dtmcli.Logf("busi trans submit") err = msg.Submit() e2p(err) return msg.Gid diff --git a/examples/main_saga.go b/examples/main_saga.go index c66faa5..76f79f5 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -2,7 +2,6 @@ package examples import ( "github.com/gin-gonic/gin" - "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" ) @@ -12,7 +11,7 @@ func SagaSetup(app *gin.Engine) { // SagaFireRequest 1 func SagaFireRequest() string { - common.Logf("a saga busi transaction begin") + dtmcli.Logf("a saga busi transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", @@ -21,9 +20,9 @@ func SagaFireRequest() string { saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) - common.Logf("saga busi trans submit") + dtmcli.Logf("saga busi trans submit") err := saga.Submit() - common.Logf("result gid is: %s", saga.Gid) + dtmcli.Logf("result gid is: %s", saga.Gid) e2p(err) return saga.Gid } diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 5d2755c..b35ccf3 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -10,12 +10,12 @@ import ( // SagaBarrierFireRequest 1 func SagaBarrierFireRequest() string { - common.Logf("a busi transaction begin") + dtmcli.Logf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - common.Logf("busi trans submit") + dtmcli.Logf("busi trans submit") err := saga.Submit() e2p(err) return saga.Gid @@ -27,11 +27,11 @@ func SagaBarrierAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/SagaBTransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate)) app.POST(BusiAPI+"/SagaBTransOut", common.WrapHandler(sagaBarrierTransOut)) app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) - common.Logf("examples listening at %d", BusiPort) + dtmcli.Logf("examples listening at %d", BusiPort) } func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { - _, err := common.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + _, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) return dtmcli.ResultSuccess, err } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 44279b0..3f7ef09 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -12,7 +12,7 @@ func TccSetup(app *gin.Engine) { app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromReq(c) e2p(err) - common.Logf("TransInTccParent ") + dtmcli.Logf("TransInTccParent ") return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") })) } @@ -33,7 +33,7 @@ func TccFireRequestNested() string { // TccFireRequest 1 func TccFireRequest() string { - common.Logf("tcc simple transaction begin") + dtmcli.Logf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmServer) err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index f1464ff..4c52a09 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -12,7 +12,7 @@ import ( // TccBarrierFireRequest 1 func TccBarrierFireRequest() string { - common.Logf("tcc transaction begin") + dtmcli.Logf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") @@ -33,24 +33,24 @@ func TccBarrierAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/TccBTransOutTry", common.WrapHandler(tccBarrierTransOutTry)) app.POST(BusiAPI+"/TccBTransOutConfirm", common.WrapHandler(tccBarrierTransOutConfirm)) app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel)) - common.Logf("examples listening at %d", BusiPort) + dtmcli.Logf("examples listening at %d", BusiPort) } const transInUID = 1 const transOutUID = 2 func adjustTrading(sdb *sql.Tx, uid int, amount int) (interface{}, error) { - affected, err := common.StxExec(sdb, "update dtm_busi.user_account_trading set trading_balance=trading_balance + ? where user_id=? and trading_balance + ? + (select balance from dtm_busi.user_account where id=?) >= 0", amount, uid, amount, uid) + affected, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account_trading set trading_balance=trading_balance + ? where user_id=? and trading_balance + ? + (select balance from dtm_busi.user_account where id=?) >= 0", amount, uid, amount, uid) if err == nil && affected == 0 { return nil, fmt.Errorf("update error, maybe balance not enough") } - return common.MS{"dtm_server": "SUCCESS"}, nil + return dtmcli.MS{"dtm_server": "SUCCESS"}, nil } func adjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { - affected, err := common.StxExec(sdb, "update dtm_busi.user_account_trading set trading_balance = trading_balance + ? where user_id=?;", -amount, uid) + affected, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account_trading set trading_balance = trading_balance + ? where user_id=?;", -amount, uid) if err == nil && affected == 1 { - affected, err = common.StxExec(sdb, "update dtm_busi.user_account set balance=balance+? where user_id=?", amount, uid) + affected, err = dtmcli.StxExec(sdb, "update dtm_busi.user_account set balance=balance+? where user_id=?", amount, uid) } if err == nil && affected == 0 { return nil, fmt.Errorf("update 0 rows") diff --git a/examples/main_xa.go b/examples/main_xa.go index b3e4a75..c7890da 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -44,7 +44,7 @@ func xaTransIn(c *gin.Context) (interface{}, error) { if reqFrom(c).TransInResult == "FAILURE" { return dtmcli.ResultFailure, nil } - _, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2) + _, err := dtmcli.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2) return dtmcli.ResultSuccess, err }) } @@ -54,7 +54,7 @@ func xaTransOut(c *gin.Context) (interface{}, error) { if reqFrom(c).TransOutResult == "FAILURE" { return dtmcli.ResultFailure, nil } - _, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1) + _, err := dtmcli.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1) return dtmcli.ResultSuccess, err }) } diff --git a/examples/quick_start.go b/examples/quick_start.go index b42ef2c..40a4787 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -21,7 +21,7 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI) func QsStartSvr() { app := common.GetGinApp() qsAddRoute(app) - common.Logf("quick qs examples listening at %d", qsBusiPort) + dtmcli.Logf("quick qs examples listening at %d", qsBusiPort) go app.Run(fmt.Sprintf(":%d", qsBusiPort)) time.Sleep(100 * time.Millisecond) } @@ -42,7 +42,7 @@ func QsFireRequest() string { } func qsAdjustBalance(uid int, amount int) (interface{}, error) { - _, err := common.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + _, err := dtmcli.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) return dtmcli.ResultSuccess, err } diff --git a/examples/types.go b/examples/types.go index f5c8286..44192b0 100644 --- a/examples/types.go +++ b/examples/types.go @@ -9,7 +9,7 @@ import ( "github.com/yedf/dtm/dtmcli" ) -var e2p = common.E2P +var e2p = dtmcli.E2P // M alias type M = map[string]interface{} @@ -32,8 +32,8 @@ func (t *TransReq) String() string { func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { return &TransReq{ Amount: amount, - TransOutResult: common.If(outFailed, "FAILURE", "SUCCESS").(string), - TransInResult: common.If(inFailed, "FAILURE", "SUCCESS").(string), + TransOutResult: dtmcli.If(outFailed, "FAILURE", "SUCCESS").(string), + TransInResult: dtmcli.If(inFailed, "FAILURE", "SUCCESS").(string), } } @@ -64,5 +64,5 @@ func dbGet() *common.DB { } func sdbGet() *sql.DB { - return common.SdbGet(config.DB) + return dtmcli.SdbGet(config.DB) }