change to submit and abort

This commit is contained in:
yedongfu 2021-07-03 12:00:52 +08:00
parent b45838187d
commit a151239863
18 changed files with 52 additions and 52 deletions

View File

@ -44,7 +44,7 @@ saga := dtm.SagaNew(DtmServer).
Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req).
Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req)
// 提交saga事务 // 提交saga事务
err := saga.Commit() err := saga.Submit()
``` ```
### 完整示例 ### 完整示例
参考[examples/quick_start.go](./examples/quick_start.go) 参考[examples/quick_start.go](./examples/quick_start.go)

View File

@ -12,9 +12,9 @@ import (
func AddRoute(engine *gin.Engine) { func AddRoute(engine *gin.Engine) {
engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare)) engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare))
engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit)) engine.POST("/api/dtmsvr/submit", common.WrapHandler(Submit))
engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch)) engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch))
engine.POST("/api/dtmsvr/rollback", common.WrapHandler(Rollback)) engine.POST("/api/dtmsvr/abort", common.WrapHandler(Abort))
engine.GET("/api/dtmsvr/query", common.WrapHandler(Query)) engine.GET("/api/dtmsvr/query", common.WrapHandler(Query))
} }
@ -25,16 +25,16 @@ func Prepare(c *gin.Context) (interface{}, error) {
return M{"message": "SUCCESS", "gid": m.Gid}, nil return M{"message": "SUCCESS", "gid": m.Gid}, nil
} }
func Commit(c *gin.Context) (interface{}, error) { func Submit(c *gin.Context) (interface{}, error) {
db := dbGet() db := dbGet()
m := TransFromContext(c) m := TransFromContext(c)
m.Status = "committed" m.Status = "submitted"
m.SaveNew(db) m.SaveNew(db)
go m.Process(db) go m.Process(db)
return M{"message": "SUCCESS", "gid": m.Gid}, nil return M{"message": "SUCCESS", "gid": m.Gid}, nil
} }
func Rollback(c *gin.Context) (interface{}, error) { func Abort(c *gin.Context) (interface{}, error) {
db := dbGet() db := dbGet()
m := TransFromContext(c) m := TransFromContext(c)
m = TransFromDb(db, m.Gid) m = TransFromDb(db, m.Gid)

View File

@ -8,7 +8,7 @@ CREATE TABLE if not EXISTS `trans_global` (
`gid` varchar(128) NOT NULL COMMENT '事务全局id', `gid` varchar(128) NOT NULL COMMENT '事务全局id',
`trans_type` varchar(45) not null COMMENT '事务类型: saga | xa', `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa',
`data` TEXT COMMENT '事务携带的数据', `data` TEXT COMMENT '事务携带的数据',
`status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | committed | finished | rollbacked', `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | submitted | finished | rollbacked',
`query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api',
`create_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL,
@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` (
`data` TEXT COMMENT '请求所携带的数据', `data` TEXT COMMENT '请求所携带的数据',
`branch` VARCHAR(128) NOT NULL COMMENT '事务分支名称', `branch` VARCHAR(128) NOT NULL COMMENT '事务分支名称',
`branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', `branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 committed | finished | rollbacked', `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked',
`finish_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL,

View File

@ -62,7 +62,7 @@ func TestCover(t *testing.T) {
db := dbGet() db := dbGet()
db.NoMust() db.NoMust()
CronTransOnce(0, "prepared") CronTransOnce(0, "prepared")
CronTransOnce(0, "committed") CronTransOnce(0, "submitted")
defer handlePanic() defer handlePanic()
checkAffected(db.DB) checkAffected(db.DB)
} }
@ -134,31 +134,31 @@ func xaRollback(t *testing.T) {
func tccNormal(t *testing.T) { func tccNormal(t *testing.T) {
tcc := genTcc("gid-tcc-normal", false, false) tcc := genTcc("gid-tcc-normal", false, false)
tcc.Commit() tcc.Submit()
assert.Equal(t, "committed", getTransStatus(tcc.Gid)) assert.Equal(t, "submitted", getTransStatus(tcc.Gid))
WaitTransProcessed(tcc.Gid) WaitTransProcessed(tcc.Gid)
assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid)) assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid))
} }
func tccRollback(t *testing.T) { func tccRollback(t *testing.T) {
tcc := genTcc("gid-tcc-rollback", false, true) tcc := genTcc("gid-tcc-rollback", false, true)
tcc.Commit() tcc.Submit()
WaitTransProcessed(tcc.Gid) WaitTransProcessed(tcc.Gid)
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
} }
func tccRollbackPending(t *testing.T) { func tccRollbackPending(t *testing.T) {
tcc := genTcc("gid-tcc-rollback-pending", false, true) tcc := genTcc("gid-tcc-rollback-pending", false, true)
examples.MainSwitch.TransInRevertResult.SetOnce("PENDING") examples.MainSwitch.TransInRevertResult.SetOnce("PENDING")
tcc.Commit() tcc.Submit()
WaitTransProcessed(tcc.Gid) WaitTransProcessed(tcc.Gid)
// assert.Equal(t, "committed", getTransStatus(tcc.Gid)) // assert.Equal(t, "submitted", getTransStatus(tcc.Gid))
CronTransOnce(60*time.Second, "committed") CronTransOnce(60*time.Second, "submitted")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
} }
func msgNormal(t *testing.T) { func msgNormal(t *testing.T) {
msg := genMsg("gid-normal-msg") msg := genMsg("gid-normal-msg")
msg.Commit() msg.Submit()
assert.Equal(t, "committed", getTransStatus(msg.Gid)) assert.Equal(t, "submitted", getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid) WaitTransProcessed(msg.Gid)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid))
@ -173,16 +173,16 @@ func msgPending(t *testing.T) {
assert.Equal(t, "prepared", getTransStatus(msg.Gid)) assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.TransInResult.SetOnce("PENDING") examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60*time.Second, "prepared") CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "committed", getTransStatus(msg.Gid)) assert.Equal(t, "submitted", getTransStatus(msg.Gid))
CronTransOnce(60*time.Second, "committed") CronTransOnce(60*time.Second, "submitted")
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid))
} }
func sagaNormal(t *testing.T) { func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false) saga := genSaga("gid-noramlSaga", false, false)
saga.Commit() saga.Submit()
assert.Equal(t, "committed", getTransStatus(saga.Gid)) assert.Equal(t, "submitted", getTransStatus(saga.Gid))
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
transQuery(t, saga.Gid) transQuery(t, saga.Gid)
@ -190,7 +190,7 @@ func sagaNormal(t *testing.T) {
func sagaRollback(t *testing.T) { func sagaRollback(t *testing.T) {
saga := genSaga("gid-rollbackSaga2", false, true) saga := genSaga("gid-rollbackSaga2", false, true)
saga.Commit() saga.Submit()
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid)) assert.Equal(t, "failed", getTransStatus(saga.Gid))
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
@ -199,10 +199,10 @@ func sagaRollback(t *testing.T) {
func sagaCommittedPending(t *testing.T) { func sagaCommittedPending(t *testing.T) {
saga := genSaga("gid-committedPending", false, false) saga := genSaga("gid-committedPending", false, false)
examples.MainSwitch.TransInResult.SetOnce("PENDING") examples.MainSwitch.TransInResult.SetOnce("PENDING")
saga.Commit() saga.Submit()
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(60*time.Second, "committed") CronTransOnce(60*time.Second, "submitted")
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid)) assert.Equal(t, "succeed", getTransStatus(saga.Gid))
} }
@ -273,7 +273,7 @@ func TestSqlDB(t *testing.T) {
dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(0)) asserts.Equal(dbr.RowsAffected, int64(0))
_, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { _, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) {
logrus.Printf("commit gid2") logrus.Printf("submit gid2")
return nil, nil return nil, nil
}) })
asserts.Nil(err) asserts.Nil(err)

View File

@ -143,7 +143,7 @@ func (t *TransGlobal) SaveNew(db *common.DB) {
DoNothing: true, DoNothing: true,
}).Create(&branches) }).Create(&branches)
} }
} else if dbr.RowsAffected == 0 && t.Status == "committed" { // 如果数据库已经存放了prepared的事务则修改状态 } else if dbr.RowsAffected == 0 && t.Status == "submitted" { // 如果数据库已经存放了prepared的事务则修改状态
dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t) dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t)
} }
return nil return nil

View File

@ -63,13 +63,13 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) {
t.touch(db, t.NextCronInterval*2) t.touch(db, t.NextCronInterval*2)
} }
} else if strings.Contains(body, "SUCCESS") { } else if strings.Contains(body, "SUCCESS") {
t.changeStatus(db, "committed") t.changeStatus(db, "submitted")
} }
} }
func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
t.mayQueryPrepared(db) t.mayQueryPrepared(db)
if t.Status != "committed" { if t.Status != "submitted" {
return return
} }
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤

View File

@ -51,7 +51,7 @@ func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
} }
func (t *TransSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { func (t *TransSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
if t.Status != "committed" { if t.Status != "submitted" {
return return
} }
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤

View File

@ -51,7 +51,7 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
} }
func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
if t.Status != "committed" { if t.Status != "submitted" {
return return
} }
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤

View File

@ -38,11 +38,11 @@ func (t *TransXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
if t.Status == "succeed" { if t.Status == "succeed" {
return return
} }
currentType := common.If(t.Status == "committed", "commit", "rollback").(string) currentType := common.If(t.Status == "submitted", "commit", "rollback").(string)
for _, branch := range branches { for _, branch := range branches {
if branch.BranchType == currentType && branch.Status != "succeed" { if branch.BranchType == currentType && branch.Status != "succeed" {
t.ExecBranch(db, &branch) t.ExecBranch(db, &branch)
} }
} }
t.changeStatus(db, common.If(t.Status == "committed", "succeed", "failed").(string)) t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string))
} }

View File

@ -31,7 +31,7 @@ func MsgFireRequest() {
Add(Busi+"/TransIn", req) Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/TransQuery") err := msg.Prepare(Busi + "/TransQuery")
e2p(err) e2p(err)
logrus.Printf("busi trans commit") logrus.Printf("busi trans submit")
err = msg.Commit() err = msg.Submit()
e2p(err) e2p(err)
} }

View File

@ -31,8 +31,8 @@ func SagaFireRequest() {
saga := dtm.SagaNew(DtmServer). saga := dtm.SagaNew(DtmServer).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req) Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
logrus.Printf("saga busi trans commit") logrus.Printf("saga busi trans submit")
err := saga.Commit() err := saga.Submit()
logrus.Printf("result gid is: %s", saga.Gid) logrus.Printf("result gid is: %s", saga.Gid)
e2p(err) e2p(err)
} }

View File

@ -40,8 +40,8 @@ func SagaBarrierFireRequest() {
saga := dtm.SagaNew(DtmServer). saga := dtm.SagaNew(DtmServer).
Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req). Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req).
Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req) Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req)
logrus.Printf("busi trans commit") logrus.Printf("busi trans submit")
err := saga.Commit() err := saga.Submit()
e2p(err) e2p(err)
} }

View File

@ -31,7 +31,7 @@ func TccFireRequest() {
tcc := dtm.TccNew(DtmServer). tcc := dtm.TccNew(DtmServer).
Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req). Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req) Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req)
logrus.Printf("tcc trans commit") logrus.Printf("tcc trans submit")
err := tcc.Commit() err := tcc.Submit()
e2p(err) e2p(err)
} }

View File

@ -34,7 +34,7 @@ func qsFireRequest() {
saga := dtm.SagaNew(DtmServer). saga := dtm.SagaNew(DtmServer).
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
err := saga.Commit() err := saga.Submit()
e2p(err) e2p(err)
} }

View File

@ -42,14 +42,14 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s return s
} }
func (s *Msg) Commit() error { func (s *Msg) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/commit", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode() != 200 { if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body()) return fmt.Errorf("submit failed: %v", resp.Body())
} }
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil return nil

View File

@ -43,14 +43,14 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
return s return s
} }
func (s *Saga) Commit() error { func (s *Saga) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server))
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode() != 200 { if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body()) return fmt.Errorf("submit failed: %v", resp.Body())
} }
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil return nil

6
tcc.go
View File

@ -45,14 +45,14 @@ func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *
return s return s
} }
func (s *Tcc) Commit() error { func (s *Tcc) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) logrus.Printf("committing %s body: %v", s.Gid, &s.TccData)
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server))
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode() != 200 { if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body()) return fmt.Errorf("submit failed: %v", resp.Body())
} }
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil return nil

4
xa.go
View File

@ -83,7 +83,7 @@ func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rer
defer func() { defer func() {
x := recover() x := recover()
if x != nil { if x != nil {
_, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/rollback") _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort")
rerr = x.(error) rerr = x.(error)
} }
}() }()
@ -94,7 +94,7 @@ func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rer
} }
err = transFunc() err = transFunc()
e2p(err) e2p(err)
resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/commit") resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit")
e2p(err) e2p(err)
if !strings.Contains(resp.String(), "SUCCESS") { if !strings.Contains(resp.String(), "SUCCESS") {
panic(fmt.Errorf("unexpected result: %s", resp.String())) panic(fmt.Errorf("unexpected result: %s", resp.String()))