From 60c5c95972db56699052ab2ccc6cff4a2d835c0c Mon Sep 17 00:00:00 2001 From: yedongfu Date: Fri, 28 May 2021 19:32:19 +0800 Subject: [PATCH] status change to succeed and failed --- dtmsvr/dtmsvr.sql | 4 +- dtmsvr/dtmsvr_test.go | 65 ++++++++++++++++-------- dtmsvr/trans.go | 74 +++++++++++++++------------ dtmsvr/types.go | 4 +- examples/saga_main.go | 114 ------------------------------------------ examples/types.go | 33 +++++++++--- examples/utils.go | 20 -------- examples/xa_main.go | 108 --------------------------------------- saga.go | 10 ++-- tcc.go | 34 ++++++------- 10 files changed, 139 insertions(+), 327 deletions(-) delete mode 100644 examples/saga_main.go delete mode 100644 examples/utils.go delete mode 100644 examples/xa_main.go diff --git a/dtmsvr/dtmsvr.sql b/dtmsvr/dtmsvr.sql index e448100..3576951 100644 --- a/dtmsvr/dtmsvr.sql +++ b/dtmsvr/dtmsvr.sql @@ -7,7 +7,7 @@ CREATE TABLE `saga` ( `id` int(11) NOT NULL AUTO_INCREMENT, `gid` varchar(45) NOT NULL COMMENT '事务全局id', `steps` json NOT NULL COMMENT 'saga的所有步骤', - `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | processing | finished | rollbacked', + `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | processing | succeed | failed', `trans_query` varchar(128) NOT NULL COMMENT '事务未决状态的查询api', `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, @@ -27,7 +27,7 @@ CREATE TABLE `saga_step` ( `step` int(11) NOT NULL COMMENT '处于saga中的第几步', `url` varchar(128) NOT NULL COMMENT '动作关联的url', `type` varchar(45) NOT NULL COMMENT 'saga的所有步骤', - `status` varchar(45) NOT NULL COMMENT '步骤的状态 prepared | finished | rollbacked', + `status` varchar(45) NOT NULL COMMENT '步骤的状态 prepared | succeed | failed', `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 1d6d07b..3a1d02b 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -36,7 +36,7 @@ func TestDtmSvr(t *testing.T) { e2p(dbGet().Exec("truncate trans_branch").Error) e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() - + // tccNormal(t) sagaCommittedPending(t) sagaPreparePending(t) xaRollback(t) @@ -95,7 +95,7 @@ func xaNormal(t *testing.T) { }) e2p(err) WaitTransProcessed(gid) - assert.Equal(t, []string{"finished", "finished"}, getBranchesStatus(gid)) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(gid)) } func xaRollback(t *testing.T) { @@ -119,68 +119,89 @@ func xaRollback(t *testing.T) { logrus.Errorf("global transaction failed, so rollback") } WaitTransProcessed(gid) - assert.Equal(t, []string{"rollbacked"}, getBranchesStatus(gid)) + assert.Equal(t, []string{"failed"}, getBranchesStatus(gid)) } +func tccNormal(t *testing.T) { + tcc := genTcc("gid-normal-tcc", false, false) + tcc.Prepare(tcc.QueryPrepared) + assert.Equal(t, "prepared", getSagaModel(tcc.Gid).Status) + tcc.Commit() + assert.Equal(t, "committed", getSagaModel(tcc.Gid).Status) + WaitTransProcessed(tcc.Gid) + assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid)) + +} func sagaNormal(t *testing.T) { saga := genSaga("gid-noramlSaga", false, false) - saga.Prepare() + saga.Prepare(saga.QueryPrepared) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) saga.Commit() assert.Equal(t, "committed", getSagaModel(saga.Gid).Status) WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) } func sagaRollback(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) saga.Commit() WaitTransProcessed(saga.Gid) - saga.Prepare() - assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status) - assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getBranchesStatus(saga.Gid)) + saga.Prepare(saga.QueryPrepared) + assert.Equal(t, "failed", getSagaModel(saga.Gid).Status) + assert.Equal(t, []string{"failed", "succeed", "failed", "failed"}, getBranchesStatus(saga.Gid)) } func sagaPrepareCancel(t *testing.T) { saga := genSaga("gid1-prepareCancel", false, true) - saga.Prepare() - examples.TransQueryResult = "FAIL" + saga.Prepare(saga.QueryPrepared) + examples.SagaTransQueryResult = "FAIL" config.PreparedExpire = -10 CronTransOnce(-10*time.Second, "prepared") - examples.TransQueryResult = "" + examples.SagaTransQueryResult = "" config.PreparedExpire = 60 assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) } func sagaPreparePending(t *testing.T) { saga := genSaga("gid1-preparePending", false, false) - saga.Prepare() - examples.TransQueryResult = "PENDING" + saga.Prepare(saga.QueryPrepared) + examples.SagaTransQueryResult = "PENDING" CronTransOnce(-10*time.Second, "prepared") - examples.TransQueryResult = "" + examples.SagaTransQueryResult = "" assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) CronTransOnce(-10*time.Second, "prepared") - assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) + assert.Equal(t, "succeed", getSagaModel(saga.Gid).Status) } func sagaCommittedPending(t *testing.T) { saga := genSaga("gid-committedPending", false, false) - saga.Prepare() - examples.TransInResult = "PENDING" + saga.Prepare(saga.QueryPrepared) + examples.SagaTransInResult = "PENDING" saga.Commit() WaitTransProcessed(saga.Gid) - examples.TransInResult = "" - assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + examples.SagaTransInResult = "" + assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) CronTransOnce(-10*time.Second, "committed") - assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) - assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getSagaModel(saga.Gid).Status) } func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtm.SagaNew(examples.DtmServer, gid, examples.SagaBusi+"/TransQuery") + saga := dtm.SagaNew(examples.DtmServer, gid) + saga.QueryPrepared = examples.SagaBusi + "/TransQuery" req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) return saga } + +func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { + logrus.Printf("beginning a saga test ---------------- %s", gid) + tcc := dtm.TccNew(examples.DtmServer, gid) + tcc.QueryPrepared = examples.TccBusi + "/TransQuery" + req := examples.GenTransReq(30, outFailed, inFailed) + tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req) + tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) + return tcc +} diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 73a7656..7bc9ca9 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -65,7 +65,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] - if step.BranchType == "compensate" && step.Status == "prepared" || step.BranchType == "action" && step.Status == "finished" { + if step.BranchType == "compensate" && step.Status == "prepared" || step.BranchType == "action" && step.Status == "succeed" { continue } if step.BranchType == "action" && step.Status == "prepared" { @@ -75,9 +75,9 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch t.touch(db.Must()) if strings.Contains(body, "SUCCESS") { - step.changeStatus(db.Must(), "finished") + step.changeStatus(db.Must(), "succeed") } else if strings.Contains(body, "FAIL") { - step.changeStatus(db.Must(), "rollbacked") + step.changeStatus(db.Must(), "failed") break } else { panic(fmt.Errorf("unknown response: %s, will be retried", body)) @@ -85,7 +85,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch } } if current == len(branches) { // saga 事务完成 - t.changeStatus(db.Must(), "finished") + t.changeStatus(db.Must(), "succeed") return } for current = current - 1; current >= 0; current-- { @@ -97,7 +97,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { - step.changeStatus(db.Must(), "rollbacked") + step.changeStatus(db.Must(), "failed") } else { panic(fmt.Errorf("expect compensate return SUCCESS")) } @@ -105,7 +105,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch if current != -1 { panic(fmt.Errorf("saga current not -1")) } - t.changeStatus(db.Must(), "rollbacked") + t.changeStatus(db.Must(), "failed") } type TransTccProcessor struct { @@ -117,7 +117,7 @@ func (t *TransTccProcessor) GenBranches() []TransBranch { steps := []M{} common.MustUnmarshalString(t.Data, &steps) for _, step := range steps { - for _, branchType := range []string{"rollback", "commit", "prepare"} { + for _, branchType := range []string{"cancel", "confirm", "try"} { nsteps = append(nsteps, TransBranch{ Gid: t.Gid, Branch: fmt.Sprintf("%d", len(nsteps)+1), @@ -131,36 +131,48 @@ func (t *TransTccProcessor) GenBranches() []TransBranch { return nsteps } -func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string { - return "" +func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) string { + resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) + e2p(err) + body := resp.String() + t.touch(db) + if strings.Contains(body, "SUCCESS") { + status := common.If(branch.BranchType == "cancel", "failed", "succeed").(string) + branch.changeStatus(db, status) + return "SUCCESS" + } + if branch.BranchType == "try" && strings.Contains(body, "FAIL") { + branch.changeStatus(db, "failed") + return "FAIL" + } + panic(fmt.Errorf("unknown response: %s, will be retried", body)) } func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { current := 0 // 当前正在处理的步骤 + // 先处理一轮正常try状态 for ; current < len(branches); current++ { - step := branches[current] - if step.BranchType == "prepare" && step.Status == "finished" || step.BranchType != "commit" && step.Status == "prepared" { + step := &branches[current] + if step.BranchType != "try" || step.Status == "succeed" { continue } - if step.BranchType == "prepare" && step.Status == "prepared" { - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - e2p(err) - body := resp.String() - t.touch(db) - if strings.Contains(body, "SUCCESS") { - step.changeStatus(db, "finished") - } else if strings.Contains(body, "FAIL") { - step.changeStatus(db, "rollbacked") + if step.BranchType == "try" && step.Status == "prepared" { + result := t.ExecBranch(db, step) + if result == "FAIL" { break - } else { - panic(fmt.Errorf("unknown response: %s, will be retried", body)) } } } - ////////////////////////////////////////////////// - if current == len(branches) { // tcc 事务完成 - t.changeStatus(db, "finished") + // 如果try全部成功,则处理confirm分支,否则处理cancel分支 + currentType := common.If(current == len(branches), "confirm", "cancel") + for current--; current >= 0; current-- { + branch := &branches[current] + if branch.BranchType != currentType || branch.Status != "prepared" { + continue + } + t.ExecBranch(db, branch) } + t.changeStatus(db, common.If(currentType == "confirm", "succeed", "failed").(string)) } type TransXaProcessor struct { @@ -181,32 +193,32 @@ func (t *TransXaProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) stri if !strings.Contains(body, "SUCCESS") { panic(fmt.Errorf("bad response: %s", body)) } - branch.changeStatus(db, common.If(t.Status == "prepared", "rollbacked", "finished").(string)) + branch.changeStatus(db, common.If(t.Status == "prepared", "failed", "succeed").(string)) return "SUCCESS" } func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { - if t.Status == "finished" { + if t.Status == "succeed" { return } if t.Status == "committed" { for _, branch := range branches { - if branch.Status == "finished" { + if branch.Status == "succeed" { continue } _ = t.ExecBranch(db, &branch) t.touch(db) // 更新update_time,避免被定时任务再次 } - t.changeStatus(db, "finished") + t.changeStatus(db, "succeed") } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 for _, branch := range branches { - if branch.Status == "rollbacked" { + if branch.Status == "failed" { continue } _ = t.ExecBranch(db, &branch) t.touch(db) } - t.changeStatus(db, "rollbacked") + t.changeStatus(db, "failed") } else { e2p(fmt.Errorf("bad trans status: %s", t.Status)) } diff --git a/dtmsvr/types.go b/dtmsvr/types.go index af23051..16f797e 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -42,9 +42,9 @@ func (t *TransGlobal) changeStatus(db *common.MyDb, status string) *gorm.DB { updates := M{ "status": status, } - if status == "finished" { + if status == "succeed" { updates["finish_time"] = time.Now() - } else if status == "rollbacked" { + } else if status == "failed" { updates["rollback_time"] = time.Now() } dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates) diff --git a/examples/saga_main.go b/examples/saga_main.go deleted file mode 100644 index fb1e03d..0000000 --- a/examples/saga_main.go +++ /dev/null @@ -1,114 +0,0 @@ -package examples - -import ( - "fmt" - "time" - - "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm" - "github.com/yedf/dtm/common" -) - -// 事务参与者的服务地址 -const SagaBusiPort = 8081 -const SagaBusiApi = "/api/busi_saga" - -var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi) - -func SagaMain() { - go SagaStartSvr() - sagaFireRequest() - time.Sleep(1000 * time.Second) -} - -func SagaStartSvr() { - logrus.Printf("saga examples starting") - app := common.GetGinApp() - AddRoute(app) - app.Run(":8081") -} - -func sagaFireRequest() { - gid := common.GenGid() - logrus.Printf("busi transaction begin: %s", gid) - req := &TransReq{ - Amount: 30, - TransInResult: "SUCCESS", - TransOutResult: "SUCCESS", - } - saga := dtm.SagaNew(DtmServer, gid, SagaBusi+"/TransQuery") - - saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) - saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) - err := saga.Prepare() - e2p(err) - logrus.Printf("busi trans commit") - err = saga.Commit() - e2p(err) -} - -// api - -func AddRoute(app *gin.Engine) { - app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(TransIn)) - app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate)) - app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(TransOut)) - app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate)) - app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(TransQuery)) - logrus.Printf("examples listening at %d", SagaBusiPort) -} - -type M = map[string]interface{} - -var TransInResult = "" -var TransOutResult = "" -var TransInCompensateResult = "" -var TransOutCompensateResult = "" -var TransQueryResult = "" - -func transReqFromContext(c *gin.Context) *TransReq { - req := TransReq{} - err := c.BindJSON(&req) - e2p(err) - return &req -} - -func TransIn(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TransInResult, req.TransInResult, "SUCCESS") - logrus.Printf("%s TransIn: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransInCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TransInCompensateResult, "SUCCESS") - logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransOut(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TransOutResult, req.TransOutResult, "SUCCESS") - logrus.Printf("%s TransOut: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransOutCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TransOutCompensateResult, "SUCCESS") - logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransQuery(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - logrus.Printf("%s TransQuery", gid) - res := common.OrString(TransQueryResult, "SUCCESS") - return M{"result": res}, nil -} diff --git a/examples/types.go b/examples/types.go index 3e07f9c..82e9efa 100644 --- a/examples/types.go +++ b/examples/types.go @@ -1,13 +1,34 @@ package examples -import "github.com/yedf/dtm/common" +import ( + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/common" +) var e2p = common.E2P -type UserAccount struct { - common.ModelBase - UserId int - Balance string +type M = map[string]interface{} + +// 指定dtm服务地址 +const DtmServer = "http://localhost:8080/api/dtmsvr" + +type TransReq struct { + Amount int `json:"amount"` + TransInResult string `json:"transInResult"` + TransOutResult string `json:"transOutResult"` } -func (u *UserAccount) TableName() string { return "user_account" } +func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { + return &TransReq{ + Amount: amount, + TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string), + TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string), + } +} + +func transReqFromContext(c *gin.Context) *TransReq { + req := TransReq{} + err := c.BindJSON(&req) + e2p(err) + return &req +} diff --git a/examples/utils.go b/examples/utils.go deleted file mode 100644 index 4efc547..0000000 --- a/examples/utils.go +++ /dev/null @@ -1,20 +0,0 @@ -package examples - -import "github.com/yedf/dtm/common" - -// 指定dtm服务地址 -const DtmServer = "http://localhost:8080/api/dtmsvr" - -type TransReq struct { - Amount int `json:"amount"` - TransInResult string `json:"transInResult"` - TransOutResult string `json:"transOutResult"` -} - -func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { - return &TransReq{ - Amount: amount, - TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string), - TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string), - } -} diff --git a/examples/xa_main.go b/examples/xa_main.go deleted file mode 100644 index ebb4ec9..0000000 --- a/examples/xa_main.go +++ /dev/null @@ -1,108 +0,0 @@ -package examples - -import ( - "fmt" - "time" - - "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm" - "github.com/yedf/dtm/common" - "gorm.io/gorm" -) - -// 事务参与者的服务地址 -const XaBusiPort = 8082 -const XaBusiApi = "/api/busi_xa" - -var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi) - -var XaClient *dtm.XaClient = nil - -func XaMain() { - go XaStartSvr() - time.Sleep(100 * time.Millisecond) - XaFireRequest() - time.Sleep(1000 * time.Second) -} - -func XaStartSvr() { - common.InitApp(&Config) - logrus.Printf("xa examples starting") - app := common.GetGinApp() - XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa") - XaAddRoute(app) - app.Run(fmt.Sprintf(":%d", XaBusiPort)) -} - -func XaFireRequest() { - gid := common.GenGid() - err := XaClient.XaGlobalTransaction(gid, func() (rerr error) { - defer common.P2E(&rerr) - req := GenTransReq(30, false, false) - resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "1", - }).Post(XaBusi + "/TransOut") - common.CheckRestySuccess(resp, err) - resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "2", - }).Post(XaBusi + "/TransOut") - common.CheckRestySuccess(resp, err) - return nil - }) - e2p(err) -} - -// api -func XaAddRoute(app *gin.Engine) { - app.POST(XaBusiApi+"/TransIn", common.WrapHandler(XaTransIn)) - app.POST(XaBusiApi+"/TransOut", common.WrapHandler(XaTransOut)) -} - -func XaTransIn(c *gin.Context) (interface{}, error) { - err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) { - req := transReqFromContext(c) - if req.TransInResult != "SUCCESS" { - return fmt.Errorf("tranIn failed") - } - dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). - Update("balance", gorm.Expr("balance - ?", req.Amount)) - return dbr.Error - }) - e2p(err) - return M{"result": "SUCCESS"}, nil -} - -func XaTransOut(c *gin.Context) (interface{}, error) { - err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) { - req := transReqFromContext(c) - if req.TransOutResult != "SUCCESS" { - return fmt.Errorf("tranOut failed") - } - dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). - Update("balance", gorm.Expr("balance + ?", req.Amount)) - return dbr.Error - }) - e2p(err) - return M{"result": "SUCCESS"}, nil -} - -func ResetXaData() { - db := dbGet() - db.Must().Exec("truncate user_account") - db.Must().Exec("insert into user_account (user_id, balance) values (1, 10000), (2, 10000)") - type XaRow struct { - Data string - } - xas := []XaRow{} - db.Must().Raw("xa recover").Scan(&xas) - for _, xa := range xas { - db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data)) - } -} - -func dbGet() *common.MyDb { - return common.DbGet(Config.Mysql) -} diff --git a/saga.go b/saga.go index 4b41fb0..a4f91df 100644 --- a/saga.go +++ b/saga.go @@ -25,12 +25,11 @@ type SagaStep struct { Data string `json:"data"` } -func SagaNew(server string, gid string, queryPrepared string) *Saga { +func SagaNew(server string, gid string) *Saga { return &Saga{ SagaData: SagaData{ - Gid: gid, - TransType: "saga", - QueryPrepared: queryPrepared, + Gid: gid, + TransType: "saga", }, Server: server, } @@ -50,7 +49,8 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) error return nil } -func (s *Saga) Prepare() error { +func (s *Saga) Prepare(queryPrepared string) error { + s.QueryPrepared = queryPrepared logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil { diff --git a/tcc.go b/tcc.go index 94ed8a1..69dc68b 100644 --- a/tcc.go +++ b/tcc.go @@ -20,39 +20,39 @@ type TccData struct { QueryPrepared string `json:"query_prepared"` } type TccStep struct { - Prepare string `json:"prepare"` - Commit string `json:"commit"` - Rollback string `json:"rollback"` - Data string `json:"data"` + Try string `json:"try"` + Confirm string `json:"confirm"` + Cancel string `json:"cancel"` + Data string `json:"data"` } -func TccNew(server string, gid string, queryPrepared string) *Saga { - return &Saga{ - SagaData: SagaData{ - Gid: gid, - TransType: "tcc", - QueryPrepared: queryPrepared, +func TccNew(server string, gid string) *Tcc { + return &Tcc{ + TccData: TccData{ + Gid: gid, + TransType: "tcc", }, Server: server, } } -func (s *Tcc) Add(prepare string, commit string, rollback string, data interface{}) error { - logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, prepare, commit, rollback, data) +func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) error { + logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) d, err := json.Marshal(data) if err != nil { return err } step := TccStep{ - Prepare: prepare, - Commit: commit, - Rollback: rollback, - Data: string(d), + Try: try, + Confirm: confirm, + Cancel: cancel, + Data: string(d), } s.Steps = append(s.Steps, step) return nil } -func (s *Tcc) Prepare() error { +func (s *Tcc) Prepare(queryPrepared string) error { + s.QueryPrepared = queryPrepared logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil {