From a151239863d2ec7ccb465b77f940c1c8b3e0dc4f Mon Sep 17 00:00:00 2001 From: yedongfu Date: Sat, 3 Jul 2021 12:00:52 +0800 Subject: [PATCH] change to submit and abort --- README.md | 2 +- dtmsvr/api.go | 10 +++++----- dtmsvr/dtmsvr.sql | 4 ++-- dtmsvr/dtmsvr_test.go | 34 +++++++++++++++++----------------- dtmsvr/trans.go | 2 +- dtmsvr/trans_msg.go | 4 ++-- dtmsvr/trans_saga.go | 2 +- dtmsvr/trans_tcc.go | 2 +- dtmsvr/trans_xa.go | 4 ++-- examples/main_msg.go | 4 ++-- examples/main_saga.go | 4 ++-- examples/main_saga_barrier.go | 4 ++-- examples/main_tcc.go | 4 ++-- examples/quick_start.go | 2 +- message.go | 6 +++--- saga.go | 6 +++--- tcc.go | 6 +++--- xa.go | 4 ++-- 18 files changed, 52 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 4e8de13..1f5b415 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ saga := dtm.SagaNew(DtmServer). Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) // 提交saga事务 -err := saga.Commit() +err := saga.Submit() ``` ### 完整示例 参考[examples/quick_start.go](./examples/quick_start.go) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 8124713..e21f630 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -12,9 +12,9 @@ import ( func AddRoute(engine *gin.Engine) { engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare)) - engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit)) + engine.POST("/api/dtmsvr/submit", common.WrapHandler(Submit)) engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch)) - engine.POST("/api/dtmsvr/rollback", common.WrapHandler(Rollback)) + engine.POST("/api/dtmsvr/abort", common.WrapHandler(Abort)) engine.GET("/api/dtmsvr/query", common.WrapHandler(Query)) } @@ -25,16 +25,16 @@ func Prepare(c *gin.Context) (interface{}, error) { return M{"message": "SUCCESS", "gid": m.Gid}, nil } -func Commit(c *gin.Context) (interface{}, error) { +func Submit(c *gin.Context) (interface{}, error) { db := dbGet() m := TransFromContext(c) - m.Status = "committed" + m.Status = "submitted" m.SaveNew(db) go m.Process(db) return M{"message": "SUCCESS", "gid": m.Gid}, nil } -func Rollback(c *gin.Context) (interface{}, error) { +func Abort(c *gin.Context) (interface{}, error) { db := dbGet() m := TransFromContext(c) m = TransFromDb(db, m.Gid) diff --git a/dtmsvr/dtmsvr.sql b/dtmsvr/dtmsvr.sql index ff8d091..d14deb9 100644 --- a/dtmsvr/dtmsvr.sql +++ b/dtmsvr/dtmsvr.sql @@ -8,7 +8,7 @@ CREATE TABLE if not EXISTS `trans_global` ( `gid` varchar(128) NOT NULL COMMENT '事务全局id', `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa', `data` TEXT COMMENT '事务携带的数据', - `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | committed | finished | rollbacked', + `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | submitted | finished | rollbacked', `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, @@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` ( `data` TEXT COMMENT '请求所携带的数据', `branch` VARCHAR(128) NOT NULL COMMENT '事务分支名称', `branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', - `status` varchar(45) NOT NULL COMMENT '步骤的状态 committed | finished | rollbacked', + `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked', `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index d21c766..a4b9399 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -62,7 +62,7 @@ func TestCover(t *testing.T) { db := dbGet() db.NoMust() CronTransOnce(0, "prepared") - CronTransOnce(0, "committed") + CronTransOnce(0, "submitted") defer handlePanic() checkAffected(db.DB) } @@ -134,31 +134,31 @@ func xaRollback(t *testing.T) { func tccNormal(t *testing.T) { tcc := genTcc("gid-tcc-normal", false, false) - tcc.Commit() - assert.Equal(t, "committed", getTransStatus(tcc.Gid)) + tcc.Submit() + assert.Equal(t, "submitted", getTransStatus(tcc.Gid)) WaitTransProcessed(tcc.Gid) assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid)) } func tccRollback(t *testing.T) { tcc := genTcc("gid-tcc-rollback", false, true) - tcc.Commit() + tcc.Submit() WaitTransProcessed(tcc.Gid) assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) } func tccRollbackPending(t *testing.T) { tcc := genTcc("gid-tcc-rollback-pending", false, true) examples.MainSwitch.TransInRevertResult.SetOnce("PENDING") - tcc.Commit() + tcc.Submit() WaitTransProcessed(tcc.Gid) - // assert.Equal(t, "committed", getTransStatus(tcc.Gid)) - CronTransOnce(60*time.Second, "committed") + // assert.Equal(t, "submitted", getTransStatus(tcc.Gid)) + CronTransOnce(60*time.Second, "submitted") assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) } func msgNormal(t *testing.T) { msg := genMsg("gid-normal-msg") - msg.Commit() - assert.Equal(t, "committed", getTransStatus(msg.Gid)) + msg.Submit() + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) WaitTransProcessed(msg.Gid) assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid)) @@ -173,16 +173,16 @@ func msgPending(t *testing.T) { assert.Equal(t, "prepared", getTransStatus(msg.Gid)) examples.MainSwitch.TransInResult.SetOnce("PENDING") CronTransOnce(60*time.Second, "prepared") - assert.Equal(t, "committed", getTransStatus(msg.Gid)) - CronTransOnce(60*time.Second, "committed") + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + CronTransOnce(60*time.Second, "submitted") assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid)) } func sagaNormal(t *testing.T) { saga := genSaga("gid-noramlSaga", false, false) - saga.Commit() - assert.Equal(t, "committed", getTransStatus(saga.Gid)) + saga.Submit() + assert.Equal(t, "submitted", getTransStatus(saga.Gid)) WaitTransProcessed(saga.Gid) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) transQuery(t, saga.Gid) @@ -190,7 +190,7 @@ func sagaNormal(t *testing.T) { func sagaRollback(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) - saga.Commit() + saga.Submit() WaitTransProcessed(saga.Gid) assert.Equal(t, "failed", getTransStatus(saga.Gid)) assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) @@ -199,10 +199,10 @@ func sagaRollback(t *testing.T) { func sagaCommittedPending(t *testing.T) { saga := genSaga("gid-committedPending", false, false) examples.MainSwitch.TransInResult.SetOnce("PENDING") - saga.Commit() + saga.Submit() WaitTransProcessed(saga.Gid) assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) - CronTransOnce(60*time.Second, "committed") + CronTransOnce(60*time.Second, "submitted") assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, "succeed", getTransStatus(saga.Gid)) } @@ -273,7 +273,7 @@ func TestSqlDB(t *testing.T) { dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(0)) _, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { - logrus.Printf("commit gid2") + logrus.Printf("submit gid2") return nil, nil }) asserts.Nil(err) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 9b3c2bf..cce5a48 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -143,7 +143,7 @@ func (t *TransGlobal) SaveNew(db *common.DB) { DoNothing: true, }).Create(&branches) } - } else if dbr.RowsAffected == 0 && t.Status == "committed" { // 如果数据库已经存放了prepared的事务,则修改状态 + } else if dbr.RowsAffected == 0 && t.Status == "submitted" { // 如果数据库已经存放了prepared的事务,则修改状态 dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t) } return nil diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index e3c31e7..b2ad62a 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -63,13 +63,13 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) { t.touch(db, t.NextCronInterval*2) } } else if strings.Contains(body, "SUCCESS") { - t.changeStatus(db, "committed") + t.changeStatus(db, "submitted") } } func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { t.mayQueryPrepared(db) - if t.Status != "committed" { + if t.Status != "submitted" { return } current := 0 // 当前正在处理的步骤 diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 66052e3..e8de14b 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -51,7 +51,7 @@ func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } func (t *TransSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - if t.Status != "committed" { + if t.Status != "submitted" { return } current := 0 // 当前正在处理的步骤 diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 7c3a7f3..5445885 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -51,7 +51,7 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - if t.Status != "committed" { + if t.Status != "submitted" { return } current := 0 // 当前正在处理的步骤 diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 99c7a5f..b0ccbfc 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -38,11 +38,11 @@ func (t *TransXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { if t.Status == "succeed" { return } - currentType := common.If(t.Status == "committed", "commit", "rollback").(string) + currentType := common.If(t.Status == "submitted", "commit", "rollback").(string) for _, branch := range branches { if branch.BranchType == currentType && branch.Status != "succeed" { t.ExecBranch(db, &branch) } } - t.changeStatus(db, common.If(t.Status == "committed", "succeed", "failed").(string)) + t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string)) } diff --git a/examples/main_msg.go b/examples/main_msg.go index 1dd350c..4795647 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -31,7 +31,7 @@ func MsgFireRequest() { Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/TransQuery") e2p(err) - logrus.Printf("busi trans commit") - err = msg.Commit() + logrus.Printf("busi trans submit") + err = msg.Submit() e2p(err) } diff --git a/examples/main_saga.go b/examples/main_saga.go index 3d1569d..12ea8b6 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -31,8 +31,8 @@ func SagaFireRequest() { saga := dtm.SagaNew(DtmServer). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) - logrus.Printf("saga busi trans commit") - err := saga.Commit() + logrus.Printf("saga busi trans submit") + err := saga.Submit() logrus.Printf("result gid is: %s", saga.Gid) e2p(err) } diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 4653e80..80f353a 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -40,8 +40,8 @@ func SagaBarrierFireRequest() { saga := dtm.SagaNew(DtmServer). Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req). Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req) - logrus.Printf("busi trans commit") - err := saga.Commit() + logrus.Printf("busi trans submit") + err := saga.Submit() e2p(err) } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index d329259..32ff9d9 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -31,7 +31,7 @@ func TccFireRequest() { tcc := dtm.TccNew(DtmServer). Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req) - logrus.Printf("tcc trans commit") - err := tcc.Commit() + logrus.Printf("tcc trans submit") + err := tcc.Submit() e2p(err) } diff --git a/examples/quick_start.go b/examples/quick_start.go index aa5c587..5c39a4c 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -34,7 +34,7 @@ func qsFireRequest() { saga := dtm.SagaNew(DtmServer). Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) - err := saga.Commit() + err := saga.Submit() e2p(err) } diff --git a/message.go b/message.go index 55f1d93..63f14bd 100644 --- a/message.go +++ b/message.go @@ -42,14 +42,14 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { return s } -func (s *Msg) Commit() error { +func (s *Msg) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/commit", s.Server)) + resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) if err != nil { return err } if resp.StatusCode() != 200 { - return fmt.Errorf("commit failed: %v", resp.Body()) + return fmt.Errorf("submit failed: %v", resp.Body()) } s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil diff --git a/saga.go b/saga.go index 4b6d738..02c90b9 100644 --- a/saga.go +++ b/saga.go @@ -43,14 +43,14 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga return s } -func (s *Saga) Commit() error { +func (s *Saga) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server)) + resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) if err != nil { return err } if resp.StatusCode() != 200 { - return fmt.Errorf("commit failed: %v", resp.Body()) + return fmt.Errorf("submit failed: %v", resp.Body()) } s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil diff --git a/tcc.go b/tcc.go index bf1e942..b872ef7 100644 --- a/tcc.go +++ b/tcc.go @@ -45,14 +45,14 @@ func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) * return s } -func (s *Tcc) Commit() error { +func (s *Tcc) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) - resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server)) + resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server)) if err != nil { return err } if resp.StatusCode() != 200 { - return fmt.Errorf("commit failed: %v", resp.Body()) + return fmt.Errorf("submit failed: %v", resp.Body()) } s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil diff --git a/xa.go b/xa.go index 8d67b97..9b29168 100644 --- a/xa.go +++ b/xa.go @@ -83,7 +83,7 @@ func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rer defer func() { x := recover() if x != nil { - _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/rollback") + _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort") rerr = x.(error) } }() @@ -94,7 +94,7 @@ func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rer } err = transFunc() e2p(err) - resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/commit") + resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit") e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("unexpected result: %s", resp.String()))