status change to succeed and failed

This commit is contained in:
yedongfu 2021-05-28 19:32:19 +08:00
parent 2332d19f24
commit 60c5c95972
10 changed files with 139 additions and 327 deletions

View File

@ -7,7 +7,7 @@ CREATE TABLE `saga` (
`id` int(11) NOT NULL AUTO_INCREMENT, `id` int(11) NOT NULL AUTO_INCREMENT,
`gid` varchar(45) NOT NULL COMMENT '事务全局id', `gid` varchar(45) NOT NULL COMMENT '事务全局id',
`steps` json NOT NULL COMMENT 'saga的所有步骤', `steps` json NOT NULL COMMENT 'saga的所有步骤',
`status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | processing | finished | rollbacked', `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | processing | succeed | failed',
`trans_query` varchar(128) NOT NULL COMMENT '事务未决状态的查询api', `trans_query` varchar(128) NOT NULL COMMENT '事务未决状态的查询api',
`finish_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL,
@ -27,7 +27,7 @@ CREATE TABLE `saga_step` (
`step` int(11) NOT NULL COMMENT '处于saga中的第几步', `step` int(11) NOT NULL COMMENT '处于saga中的第几步',
`url` varchar(128) NOT NULL COMMENT '动作关联的url', `url` varchar(128) NOT NULL COMMENT '动作关联的url',
`type` varchar(45) NOT NULL COMMENT 'saga的所有步骤', `type` varchar(45) NOT NULL COMMENT 'saga的所有步骤',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 prepared | finished | rollbacked', `status` varchar(45) NOT NULL COMMENT '步骤的状态 prepared | succeed | failed',
`finish_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL,

View File

@ -36,7 +36,7 @@ func TestDtmSvr(t *testing.T) {
e2p(dbGet().Exec("truncate trans_branch").Error) e2p(dbGet().Exec("truncate trans_branch").Error)
e2p(dbGet().Exec("truncate trans_log").Error) e2p(dbGet().Exec("truncate trans_log").Error)
examples.ResetXaData() examples.ResetXaData()
// tccNormal(t)
sagaCommittedPending(t) sagaCommittedPending(t)
sagaPreparePending(t) sagaPreparePending(t)
xaRollback(t) xaRollback(t)
@ -95,7 +95,7 @@ func xaNormal(t *testing.T) {
}) })
e2p(err) e2p(err)
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, []string{"finished", "finished"}, getBranchesStatus(gid)) assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(gid))
} }
func xaRollback(t *testing.T) { func xaRollback(t *testing.T) {
@ -119,68 +119,89 @@ func xaRollback(t *testing.T) {
logrus.Errorf("global transaction failed, so rollback") logrus.Errorf("global transaction failed, so rollback")
} }
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, []string{"rollbacked"}, getBranchesStatus(gid)) assert.Equal(t, []string{"failed"}, getBranchesStatus(gid))
} }
func tccNormal(t *testing.T) {
tcc := genTcc("gid-normal-tcc", false, false)
tcc.Prepare(tcc.QueryPrepared)
assert.Equal(t, "prepared", getSagaModel(tcc.Gid).Status)
tcc.Commit()
assert.Equal(t, "committed", getSagaModel(tcc.Gid).Status)
WaitTransProcessed(tcc.Gid)
assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid))
}
func sagaNormal(t *testing.T) { func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false) saga := genSaga("gid-noramlSaga", false, false)
saga.Prepare() saga.Prepare(saga.QueryPrepared)
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
saga.Commit() saga.Commit()
assert.Equal(t, "committed", getSagaModel(saga.Gid).Status) assert.Equal(t, "committed", getSagaModel(saga.Gid).Status)
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
} }
func sagaRollback(t *testing.T) { func sagaRollback(t *testing.T) {
saga := genSaga("gid-rollbackSaga2", false, true) saga := genSaga("gid-rollbackSaga2", false, true)
saga.Commit() saga.Commit()
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
saga.Prepare() saga.Prepare(saga.QueryPrepared)
assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status) assert.Equal(t, "failed", getSagaModel(saga.Gid).Status)
assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"failed", "succeed", "failed", "failed"}, getBranchesStatus(saga.Gid))
} }
func sagaPrepareCancel(t *testing.T) { func sagaPrepareCancel(t *testing.T) {
saga := genSaga("gid1-prepareCancel", false, true) saga := genSaga("gid1-prepareCancel", false, true)
saga.Prepare() saga.Prepare(saga.QueryPrepared)
examples.TransQueryResult = "FAIL" examples.SagaTransQueryResult = "FAIL"
config.PreparedExpire = -10 config.PreparedExpire = -10
CronTransOnce(-10*time.Second, "prepared") CronTransOnce(-10*time.Second, "prepared")
examples.TransQueryResult = "" examples.SagaTransQueryResult = ""
config.PreparedExpire = 60 config.PreparedExpire = 60
assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status)
} }
func sagaPreparePending(t *testing.T) { func sagaPreparePending(t *testing.T) {
saga := genSaga("gid1-preparePending", false, false) saga := genSaga("gid1-preparePending", false, false)
saga.Prepare() saga.Prepare(saga.QueryPrepared)
examples.TransQueryResult = "PENDING" examples.SagaTransQueryResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared") CronTransOnce(-10*time.Second, "prepared")
examples.TransQueryResult = "" examples.SagaTransQueryResult = ""
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
CronTransOnce(-10*time.Second, "prepared") CronTransOnce(-10*time.Second, "prepared")
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) assert.Equal(t, "succeed", getSagaModel(saga.Gid).Status)
} }
func sagaCommittedPending(t *testing.T) { func sagaCommittedPending(t *testing.T) {
saga := genSaga("gid-committedPending", false, false) saga := genSaga("gid-committedPending", false, false)
saga.Prepare() saga.Prepare(saga.QueryPrepared)
examples.TransInResult = "PENDING" examples.SagaTransInResult = "PENDING"
saga.Commit() saga.Commit()
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
examples.TransInResult = "" examples.SagaTransInResult = ""
assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(-10*time.Second, "committed") CronTransOnce(-10*time.Second, "committed")
assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) assert.Equal(t, "succeed", getSagaModel(saga.Gid).Status)
} }
func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid) logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtm.SagaNew(examples.DtmServer, gid, examples.SagaBusi+"/TransQuery") saga := dtm.SagaNew(examples.DtmServer, gid)
saga.QueryPrepared = examples.SagaBusi + "/TransQuery"
req := examples.GenTransReq(30, outFailed, inFailed) req := examples.GenTransReq(30, outFailed, inFailed)
saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req)
saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req)
return saga return saga
} }
func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc {
logrus.Printf("beginning a saga test ---------------- %s", gid)
tcc := dtm.TccNew(examples.DtmServer, gid)
tcc.QueryPrepared = examples.TccBusi + "/TransQuery"
req := examples.GenTransReq(30, outFailed, inFailed)
tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req)
tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req)
return tcc
}

View File

@ -65,7 +65,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ { for ; current < len(branches); current++ {
step := branches[current] step := branches[current]
if step.BranchType == "compensate" && step.Status == "prepared" || step.BranchType == "action" && step.Status == "finished" { if step.BranchType == "compensate" && step.Status == "prepared" || step.BranchType == "action" && step.Status == "succeed" {
continue continue
} }
if step.BranchType == "action" && step.Status == "prepared" { if step.BranchType == "action" && step.Status == "prepared" {
@ -75,9 +75,9 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
t.touch(db.Must()) t.touch(db.Must())
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {
step.changeStatus(db.Must(), "finished") step.changeStatus(db.Must(), "succeed")
} else if strings.Contains(body, "FAIL") { } else if strings.Contains(body, "FAIL") {
step.changeStatus(db.Must(), "rollbacked") step.changeStatus(db.Must(), "failed")
break break
} else { } else {
panic(fmt.Errorf("unknown response: %s, will be retried", body)) panic(fmt.Errorf("unknown response: %s, will be retried", body))
@ -85,7 +85,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
} }
} }
if current == len(branches) { // saga 事务完成 if current == len(branches) { // saga 事务完成
t.changeStatus(db.Must(), "finished") t.changeStatus(db.Must(), "succeed")
return return
} }
for current = current - 1; current >= 0; current-- { for current = current - 1; current >= 0; current-- {
@ -97,7 +97,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
e2p(err) e2p(err)
body := resp.String() body := resp.String()
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {
step.changeStatus(db.Must(), "rollbacked") step.changeStatus(db.Must(), "failed")
} else { } else {
panic(fmt.Errorf("expect compensate return SUCCESS")) panic(fmt.Errorf("expect compensate return SUCCESS"))
} }
@ -105,7 +105,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
if current != -1 { if current != -1 {
panic(fmt.Errorf("saga current not -1")) panic(fmt.Errorf("saga current not -1"))
} }
t.changeStatus(db.Must(), "rollbacked") t.changeStatus(db.Must(), "failed")
} }
type TransTccProcessor struct { type TransTccProcessor struct {
@ -117,7 +117,7 @@ func (t *TransTccProcessor) GenBranches() []TransBranch {
steps := []M{} steps := []M{}
common.MustUnmarshalString(t.Data, &steps) common.MustUnmarshalString(t.Data, &steps)
for _, step := range steps { for _, step := range steps {
for _, branchType := range []string{"rollback", "commit", "prepare"} { for _, branchType := range []string{"cancel", "confirm", "try"} {
nsteps = append(nsteps, TransBranch{ nsteps = append(nsteps, TransBranch{
Gid: t.Gid, Gid: t.Gid,
Branch: fmt.Sprintf("%d", len(nsteps)+1), Branch: fmt.Sprintf("%d", len(nsteps)+1),
@ -131,36 +131,48 @@ func (t *TransTccProcessor) GenBranches() []TransBranch {
return nsteps return nsteps
} }
func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string { func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) string {
return "" resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
status := common.If(branch.BranchType == "cancel", "failed", "succeed").(string)
branch.changeStatus(db, status)
return "SUCCESS"
}
if branch.BranchType == "try" && strings.Contains(body, "FAIL") {
branch.changeStatus(db, "failed")
return "FAIL"
}
panic(fmt.Errorf("unknown response: %s, will be retried", body))
} }
func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤
// 先处理一轮正常try状态
for ; current < len(branches); current++ { for ; current < len(branches); current++ {
step := branches[current] step := &branches[current]
if step.BranchType == "prepare" && step.Status == "finished" || step.BranchType != "commit" && step.Status == "prepared" { if step.BranchType != "try" || step.Status == "succeed" {
continue continue
} }
if step.BranchType == "prepare" && step.Status == "prepared" { if step.BranchType == "try" && step.Status == "prepared" {
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) result := t.ExecBranch(db, step)
e2p(err) if result == "FAIL" {
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
step.changeStatus(db, "finished")
} else if strings.Contains(body, "FAIL") {
step.changeStatus(db, "rollbacked")
break break
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))
} }
} }
} }
////////////////////////////////////////////////// // 如果try全部成功则处理confirm分支否则处理cancel分支
if current == len(branches) { // tcc 事务完成 currentType := common.If(current == len(branches), "confirm", "cancel")
t.changeStatus(db, "finished") for current--; current >= 0; current-- {
branch := &branches[current]
if branch.BranchType != currentType || branch.Status != "prepared" {
continue
}
t.ExecBranch(db, branch)
} }
t.changeStatus(db, common.If(currentType == "confirm", "succeed", "failed").(string))
} }
type TransXaProcessor struct { type TransXaProcessor struct {
@ -181,32 +193,32 @@ func (t *TransXaProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) stri
if !strings.Contains(body, "SUCCESS") { if !strings.Contains(body, "SUCCESS") {
panic(fmt.Errorf("bad response: %s", body)) panic(fmt.Errorf("bad response: %s", body))
} }
branch.changeStatus(db, common.If(t.Status == "prepared", "rollbacked", "finished").(string)) branch.changeStatus(db, common.If(t.Status == "prepared", "failed", "succeed").(string))
return "SUCCESS" return "SUCCESS"
} }
func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
if t.Status == "finished" { if t.Status == "succeed" {
return return
} }
if t.Status == "committed" { if t.Status == "committed" {
for _, branch := range branches { for _, branch := range branches {
if branch.Status == "finished" { if branch.Status == "succeed" {
continue continue
} }
_ = t.ExecBranch(db, &branch) _ = t.ExecBranch(db, &branch)
t.touch(db) // 更新update_time避免被定时任务再次 t.touch(db) // 更新update_time避免被定时任务再次
} }
t.changeStatus(db, "finished") t.changeStatus(db, "succeed")
} else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景
for _, branch := range branches { for _, branch := range branches {
if branch.Status == "rollbacked" { if branch.Status == "failed" {
continue continue
} }
_ = t.ExecBranch(db, &branch) _ = t.ExecBranch(db, &branch)
t.touch(db) t.touch(db)
} }
t.changeStatus(db, "rollbacked") t.changeStatus(db, "failed")
} else { } else {
e2p(fmt.Errorf("bad trans status: %s", t.Status)) e2p(fmt.Errorf("bad trans status: %s", t.Status))
} }

View File

@ -42,9 +42,9 @@ func (t *TransGlobal) changeStatus(db *common.MyDb, status string) *gorm.DB {
updates := M{ updates := M{
"status": status, "status": status,
} }
if status == "finished" { if status == "succeed" {
updates["finish_time"] = time.Now() updates["finish_time"] = time.Now()
} else if status == "rollbacked" { } else if status == "failed" {
updates["rollback_time"] = time.Now() updates["rollback_time"] = time.Now()
} }
dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates) dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates)

View File

@ -1,114 +0,0 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const SagaBusiPort = 8081
const SagaBusiApi = "/api/busi_saga"
var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi)
func SagaMain() {
go SagaStartSvr()
sagaFireRequest()
time.Sleep(1000 * time.Second)
}
func SagaStartSvr() {
logrus.Printf("saga examples starting")
app := common.GetGinApp()
AddRoute(app)
app.Run(":8081")
}
func sagaFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer, gid, SagaBusi+"/TransQuery")
saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req)
saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
err := saga.Prepare()
e2p(err)
logrus.Printf("busi trans commit")
err = saga.Commit()
e2p(err)
}
// api
func AddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(TransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(TransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate))
app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(TransQuery))
logrus.Printf("examples listening at %d", SagaBusiPort)
}
type M = map[string]interface{}
var TransInResult = ""
var TransOutResult = ""
var TransInCompensateResult = ""
var TransOutCompensateResult = ""
var TransQueryResult = ""
func transReqFromContext(c *gin.Context) *TransReq {
req := TransReq{}
err := c.BindJSON(&req)
e2p(err)
return &req
}
func TransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(TransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

View File

@ -1,13 +1,34 @@
package examples package examples
import "github.com/yedf/dtm/common" import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
)
var e2p = common.E2P var e2p = common.E2P
type UserAccount struct { type M = map[string]interface{}
common.ModelBase
UserId int // 指定dtm服务地址
Balance string const DtmServer = "http://localhost:8080/api/dtmsvr"
type TransReq struct {
Amount int `json:"amount"`
TransInResult string `json:"transInResult"`
TransOutResult string `json:"transOutResult"`
} }
func (u *UserAccount) TableName() string { return "user_account" } func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
return &TransReq{
Amount: amount,
TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string),
TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string),
}
}
func transReqFromContext(c *gin.Context) *TransReq {
req := TransReq{}
err := c.BindJSON(&req)
e2p(err)
return &req
}

View File

@ -1,20 +0,0 @@
package examples
import "github.com/yedf/dtm/common"
// 指定dtm服务地址
const DtmServer = "http://localhost:8080/api/dtmsvr"
type TransReq struct {
Amount int `json:"amount"`
TransInResult string `json:"transInResult"`
TransOutResult string `json:"transOutResult"`
}
func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
return &TransReq{
Amount: amount,
TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string),
TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string),
}
}

View File

@ -1,108 +0,0 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
"gorm.io/gorm"
)
// 事务参与者的服务地址
const XaBusiPort = 8082
const XaBusiApi = "/api/busi_xa"
var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi)
var XaClient *dtm.XaClient = nil
func XaMain() {
go XaStartSvr()
time.Sleep(100 * time.Millisecond)
XaFireRequest()
time.Sleep(1000 * time.Second)
}
func XaStartSvr() {
common.InitApp(&Config)
logrus.Printf("xa examples starting")
app := common.GetGinApp()
XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa")
XaAddRoute(app)
app.Run(fmt.Sprintf(":%d", XaBusiPort))
}
func XaFireRequest() {
gid := common.GenGid()
err := XaClient.XaGlobalTransaction(gid, func() (rerr error) {
defer common.P2E(&rerr)
req := GenTransReq(30, false, false)
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "1",
}).Post(XaBusi + "/TransOut")
common.CheckRestySuccess(resp, err)
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "2",
}).Post(XaBusi + "/TransOut")
common.CheckRestySuccess(resp, err)
return nil
})
e2p(err)
}
// api
func XaAddRoute(app *gin.Engine) {
app.POST(XaBusiApi+"/TransIn", common.WrapHandler(XaTransIn))
app.POST(XaBusiApi+"/TransOut", common.WrapHandler(XaTransOut))
}
func XaTransIn(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) {
req := transReqFromContext(c)
if req.TransInResult != "SUCCESS" {
return fmt.Errorf("tranIn failed")
}
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
Update("balance", gorm.Expr("balance - ?", req.Amount))
return dbr.Error
})
e2p(err)
return M{"result": "SUCCESS"}, nil
}
func XaTransOut(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) {
req := transReqFromContext(c)
if req.TransOutResult != "SUCCESS" {
return fmt.Errorf("tranOut failed")
}
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
Update("balance", gorm.Expr("balance + ?", req.Amount))
return dbr.Error
})
e2p(err)
return M{"result": "SUCCESS"}, nil
}
func ResetXaData() {
db := dbGet()
db.Must().Exec("truncate user_account")
db.Must().Exec("insert into user_account (user_id, balance) values (1, 10000), (2, 10000)")
type XaRow struct {
Data string
}
xas := []XaRow{}
db.Must().Raw("xa recover").Scan(&xas)
for _, xa := range xas {
db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
}
}
func dbGet() *common.MyDb {
return common.DbGet(Config.Mysql)
}

10
saga.go
View File

@ -25,12 +25,11 @@ type SagaStep struct {
Data string `json:"data"` Data string `json:"data"`
} }
func SagaNew(server string, gid string, queryPrepared string) *Saga { func SagaNew(server string, gid string) *Saga {
return &Saga{ return &Saga{
SagaData: SagaData{ SagaData: SagaData{
Gid: gid, Gid: gid,
TransType: "saga", TransType: "saga",
QueryPrepared: queryPrepared,
}, },
Server: server, Server: server,
} }
@ -50,7 +49,8 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) error
return nil return nil
} }
func (s *Saga) Prepare() error { func (s *Saga) Prepare(queryPrepared string) error {
s.QueryPrepared = queryPrepared
logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { if err != nil {

34
tcc.go
View File

@ -20,39 +20,39 @@ type TccData struct {
QueryPrepared string `json:"query_prepared"` QueryPrepared string `json:"query_prepared"`
} }
type TccStep struct { type TccStep struct {
Prepare string `json:"prepare"` Try string `json:"try"`
Commit string `json:"commit"` Confirm string `json:"confirm"`
Rollback string `json:"rollback"` Cancel string `json:"cancel"`
Data string `json:"data"` Data string `json:"data"`
} }
func TccNew(server string, gid string, queryPrepared string) *Saga { func TccNew(server string, gid string) *Tcc {
return &Saga{ return &Tcc{
SagaData: SagaData{ TccData: TccData{
Gid: gid, Gid: gid,
TransType: "tcc", TransType: "tcc",
QueryPrepared: queryPrepared,
}, },
Server: server, Server: server,
} }
} }
func (s *Tcc) Add(prepare string, commit string, rollback string, data interface{}) error { func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) error {
logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, prepare, commit, rollback, data) logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data)
d, err := json.Marshal(data) d, err := json.Marshal(data)
if err != nil { if err != nil {
return err return err
} }
step := TccStep{ step := TccStep{
Prepare: prepare, Try: try,
Commit: commit, Confirm: confirm,
Rollback: rollback, Cancel: cancel,
Data: string(d), Data: string(d),
} }
s.Steps = append(s.Steps, step) s.Steps = append(s.Steps, step)
return nil return nil
} }
func (s *Tcc) Prepare() error { func (s *Tcc) Prepare(queryPrepared string) error {
s.QueryPrepared = queryPrepared
logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData)
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { if err != nil {