diff --git a/.gitignore b/.gitignore index e6d0a57..a3d3cd1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,3 @@ conf.yml* *.out */**/main main -intra-barrier.md diff --git a/app/main.go b/app/main.go index 4455045..368139b 100644 --- a/app/main.go +++ b/app/main.go @@ -12,27 +12,38 @@ import ( type M = map[string]interface{} func main() { + if os.Args[1] == "quick_start" { + dtmsvr.Main() + examples.StartMain() + for { + time.Sleep(1000 * time.Second) + } + } + app := examples.BaseAppNew() + examples.BaseAppSetup(app) if len(os.Args) == 1 || os.Args[1] == "saga" { // 默认情况下,展示saga例子 dtmsvr.PopulateMysql() - go dtmsvr.Main() - go examples.SagaStartSvr() - time.Sleep(100 * time.Millisecond) + dtmsvr.Main() + examples.SagaSetup(app) + examples.BaseAppStart(app) examples.SagaFireRequest() } else if os.Args[1] == "xa" { // 启动xa示例 dtmsvr.PopulateMysql() - go dtmsvr.StartSvr() + dtmsvr.Main() examples.PopulateMysql() - examples.XaMain() + examples.XaSetup(app) + examples.BaseAppStart(app) + examples.XaFireRequest() } else if os.Args[1] == "dtmsvr" { // 只启动dtmsvr go dtmsvr.StartSvr() } else if os.Args[1] == "all" { // 运行所有示例 dtmsvr.PopulateMysql() examples.PopulateMysql() - go dtmsvr.Main() - go examples.SagaStartSvr() - go examples.TccStartSvr() - go examples.XaStartSvr() - time.Sleep(100 * time.Millisecond) + dtmsvr.Main() + examples.SagaSetup(app) + examples.TccSetup(app) + examples.XaSetup(app) + examples.BaseAppStart(app) examples.SagaFireRequest() examples.TccFireRequest() examples.XaFireRequest() diff --git a/barrier.go b/barrier.go new file mode 100644 index 0000000..6c45061 --- /dev/null +++ b/barrier.go @@ -0,0 +1,69 @@ +package dtm + +import ( + "context" + "database/sql" + "fmt" + + "github.com/yedf/dtm/common" +) + +type BusiFunc func(db *sql.DB) (interface{}, error) + +type TransInfo struct { + TransType string + Gid string + BranchID string + BranchType string +} + +func (t *TransInfo) String() string { + return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) +} + +type BarrierModel struct { + common.ModelBase + TransInfo +} + +func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } + +func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { + if branchType == "" { + return 0, nil + } + res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func ThroughBarrierCall(db *sql.DB, transType string, gid string, branchId string, branchType string, busiCall BusiFunc) (res interface{}, rerr error) { + tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) + if rerr != nil { + return + } + defer func() { + if x := recover(); x != nil { + tx.Rollback() + panic(x) + } else if rerr != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + + originType := map[string]string{ + "cancel": "action", + "compensate": "action", + }[branchType] + originAffected, _ := insertBarrier(tx, transType, gid, branchId, originType) + currentAffected, rerr := insertBarrier(tx, transType, gid, branchId, branchType) + if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 { + return + } + res, rerr = busiCall(db) + return +} diff --git a/common/types.go b/common/types.go index 4e775a5..a163f85 100644 --- a/common/types.go +++ b/common/types.go @@ -33,6 +33,12 @@ func (m *DB) NoMust() *DB { return &DB{DB: db} } +func (m *DB) ToSqlDB() *sql.DB { + d, err := m.DB.DB() + E2P(err) + return d +} + type tracePlugin struct{} func (op *tracePlugin) Name() string { @@ -100,6 +106,14 @@ func DbGet(conf map[string]string) *DB { return dbs[dsn] } +func SqlDB2DB(sdb *sql.DB) *DB { + db, err := gorm.Open(mysql.New(mysql.Config{ + Conn: sdb, + }), &gorm.Config{}) + E2P(err) + return &DB{DB: db} +} + type MyConn struct { Conn *sql.DB Dsn string diff --git a/common/utils.go b/common/utils.go index 99101c2..f450691 100644 --- a/common/utils.go +++ b/common/utils.go @@ -266,3 +266,8 @@ func GetProjectDir() string { } return file } + +func GetFuncName() string { + pc, _, _, _ := runtime.Caller(1) + return runtime.FuncForPC(pc).Name() +} diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index f207a05..d21c766 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -1,12 +1,14 @@ package dtmsvr import ( + "database/sql" + "fmt" "testing" "time" - "github.com/go-playground/assert/v2" "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" "github.com/yedf/dtm" "github.com/yedf/dtm/common" "github.com/yedf/dtm/examples" @@ -15,6 +17,8 @@ import ( var myinit int = func() int { common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = dbName + PopulateMysql() + examples.PopulateMysql() return 0 }() @@ -25,15 +29,15 @@ func TestViper(t *testing.T) { func TestDtmSvr(t *testing.T) { TransProcessedTestChan = make(chan string, 1) - PopulateMysql() - examples.PopulateMysql() // 启动组件 go StartSvr() - go examples.SagaStartSvr() - go examples.XaStartSvr() - go examples.TccStartSvr() - go examples.MsgStartSvr() - time.Sleep(time.Duration(200 * 1000 * 1000)) + app := examples.BaseAppNew() + examples.BaseAppSetup(app) + examples.SagaSetup(app) + examples.TccSetup(app) + examples.XaSetup(app) + examples.MsgSetup(app) + examples.BaseAppStart(app) // 清理数据 e2p(dbGet().Exec("truncate trans_global").Error) @@ -89,12 +93,12 @@ func xaNormal(t *testing.T) { resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "1", - }).Post(examples.XaBusi + "/TransOut") + }).Post(examples.Busi + "/TransOutXa") common.CheckRestySuccess(resp, err) resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "2", - }).Post(examples.XaBusi + "/TransIn") + }).Post(examples.Busi + "/TransInXa") common.CheckRestySuccess(resp, err) return nil }) @@ -111,12 +115,12 @@ func xaRollback(t *testing.T) { resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "1", - }).Post(examples.XaBusi + "/TransOut") + }).Post(examples.Busi + "/TransOutXa") common.CheckRestySuccess(resp, err) resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "2", - }).Post(examples.XaBusi + "/TransIn") + }).Post(examples.Busi + "/TransInXa") common.CheckRestySuccess(resp, err) return nil }) @@ -143,11 +147,10 @@ func tccRollback(t *testing.T) { } func tccRollbackPending(t *testing.T) { tcc := genTcc("gid-tcc-rollback-pending", false, true) - examples.TccTransInCancelResult = "PENDING" + examples.MainSwitch.TransInRevertResult.SetOnce("PENDING") tcc.Commit() WaitTransProcessed(tcc.Gid) - assert.Equal(t, "committed", getTransStatus(tcc.Gid)) - examples.TccTransInCancelResult = "" + // assert.Equal(t, "committed", getTransStatus(tcc.Gid)) CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) } @@ -165,14 +168,12 @@ func msgPending(t *testing.T) { msg := genMsg("gid-normal-pending") msg.Prepare("") assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MsgTransQueryResult = "PENDING" + examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") CronTransOnce(60*time.Second, "prepared") assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MsgTransQueryResult = "" - examples.MsgTransInResult = "PENDING" + examples.MainSwitch.TransInResult.SetOnce("PENDING") CronTransOnce(60*time.Second, "prepared") assert.Equal(t, "committed", getTransStatus(msg.Gid)) - examples.MsgTransInResult = "" CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid)) @@ -197,11 +198,10 @@ func sagaRollback(t *testing.T) { func sagaCommittedPending(t *testing.T) { saga := genSaga("gid-committedPending", false, false) - examples.SagaTransInResult = "PENDING" + examples.MainSwitch.TransInResult.SetOnce("PENDING") saga.Commit() WaitTransProcessed(saga.Gid) - examples.SagaTransInResult = "" - assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, "succeed", getTransStatus(saga.Gid)) @@ -210,10 +210,10 @@ func sagaCommittedPending(t *testing.T) { func genMsg(gid string) *dtm.Msg { logrus.Printf("beginning a msg test ---------------- %s", gid) msg := dtm.MsgNew(examples.DtmServer) - msg.QueryPrepared = examples.MsgBusi + "/TransQuery" + msg.QueryPrepared = examples.Busi + "/CanSubmit" req := examples.GenTransReq(30, false, false) - msg.Add(examples.MsgBusi+"/TransOut", &req) - msg.Add(examples.MsgBusi+"/TransIn", &req) + msg.Add(examples.Busi+"/TransOut", &req) + msg.Add(examples.Busi+"/TransIn", &req) msg.Gid = gid return msg } @@ -222,8 +222,8 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) saga := dtm.SagaNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) - saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) - saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) + saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) + saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) saga.Gid = gid return saga } @@ -232,8 +232,8 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { logrus.Printf("beginning a tcc test ---------------- %s", gid) tcc := dtm.TccNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) - tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req) - tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) + tcc.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutConfirm", examples.Busi+"/TransOutRevert", &req) + tcc.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInConfirm", examples.Busi+"/TransInRevert", &req) tcc.Gid = gid return tcc } @@ -258,3 +258,25 @@ func transQuery(t *testing.T, gid string) { assert.Equal(t, nil, m["transaction"]) assert.Equal(t, 0, len(m["branches"].([]interface{}))) } + +func TestSqlDB(t *testing.T) { + asserts := assert.New(t) + db := common.DbGet(config.Mysql) + db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values('saga', 'gid1', 'branch_id1', 'action')") + _, err := dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { + logrus.Printf("rollback gid2") + return nil, fmt.Errorf("gid2 error") + }) + asserts.Error(err, fmt.Errorf("gid2 error")) + dbr := db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid1").Find(&[]dtm.BarrierModel{}) + asserts.Equal(dbr.RowsAffected, int64(1)) + dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) + asserts.Equal(dbr.RowsAffected, int64(0)) + _, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { + logrus.Printf("commit gid2") + return nil, nil + }) + asserts.Nil(err) + dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) + asserts.Equal(dbr.RowsAffected, int64(2)) +} diff --git a/examples/examples.sql b/examples/examples.sql index 286a444..f9ad174 100644 --- a/examples/examples.sql +++ b/examples/examples.sql @@ -24,4 +24,21 @@ create table if not exists user_account_trading( -- 表示交易中被冻结的 key(update_time) ); -insert into user_account_trading (user_id, trading_balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE trading_balance=values (trading_balance); \ No newline at end of file +insert into user_account_trading (user_id, trading_balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE trading_balance=values (trading_balance); + +create database if not exists `dtm_barrier` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; +use dtm_barrier; + +drop table if exists barrier; +create table if not exists barrier( + id int(11) PRIMARY KEY AUTO_INCREMENT, + trans_type varchar(45) default '' , + gid varchar(128) default'', + branch_id varchar(128) default '', + branch_type varchar(45) default '', + create_time datetime DEFAULT now(), + update_time datetime DEFAULT now(), + key(create_time), + key(update_time), + UNIQUE key(gid, branch_id, branch_type) +); diff --git a/examples/main_base.go b/examples/main_base.go new file mode 100644 index 0000000..018be2c --- /dev/null +++ b/examples/main_base.go @@ -0,0 +1,88 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +const ( + BusiApi = "/api/busi" + BusiPort = 8081 +) + +var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiApi) + +func BaseAppNew() *gin.Engine { + logrus.Printf("examples starting") + app := common.GetGinApp() + return app +} + +func BaseAppStart(app *gin.Engine) { + logrus.Printf("Starting busi at: %d", BusiPort) + go app.Run(fmt.Sprintf(":%d", BusiPort)) + time.Sleep(100 * time.Millisecond) +} + +type AutoEmptyString struct { + value string +} + +func (s *AutoEmptyString) SetOnce(v string) { + s.value = v +} + +func (s *AutoEmptyString) Fetch() string { + v := s.value + s.value = "" + return v +} + +type mainSwitchType struct { + TransInResult AutoEmptyString + TransOutResult AutoEmptyString + TransInConfirmResult AutoEmptyString + TransOutConfirmResult AutoEmptyString + TransInRevertResult AutoEmptyString + TransOutRevertResult AutoEmptyString + CanSubmitResult AutoEmptyString +} + +var MainSwitch mainSwitchType + +func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) { + info := infoFromContext(c) + res := common.OrString(MainSwitch.TransInResult.Fetch(), result2, "SUCCESS") + logrus.Printf("%s %s result: %s", info.String(), common.GetFuncName(), res) + return M{"result": res}, nil + +} + +func BaseAppSetup(app *gin.Engine) { + app.POST(BusiApi+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn") + })) + app.POST(BusiApi+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "transIn") + })) + app.POST(BusiApi+"/TransInConfirm", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransInConfirmResult.Fetch(), "", "transIn") + })) + app.POST(BusiApi+"/TransOutConfirm", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransOutConfirmResult.Fetch(), "", "transIn") + })) + app.POST(BusiApi+"/TransInRevert", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransInRevertResult.Fetch(), "", "transIn") + })) + app.POST(BusiApi+"/TransOutRevert", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "transIn") + })) + app.GET(BusiApi+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + logrus.Printf("%s CanSubmit", c.Query("gid")) + return common.OrString(MainSwitch.CanSubmitResult.Fetch(), "SUCCESS"), nil + })) +} diff --git a/examples/main_msg.go b/examples/main_msg.go index 86cc8e7..1dd350c 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -1,31 +1,22 @@ package examples import ( - "fmt" "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm" - "github.com/yedf/dtm/common" ) -// 事务参与者的服务地址 -const MsgBusiApi = "/api/busi_msg" - -var MsgBusi = fmt.Sprintf("http://localhost:%d%s", MsgBusiPort, MsgBusiApi) - func MsgMain() { - go MsgStartSvr() + app := BaseAppNew() + BaseAppSetup(app) + BaseAppStart(app) MsgFireRequest() time.Sleep(1000 * time.Second) } -func MsgStartSvr() { - logrus.Printf("msg examples starting") - app := common.GetGinApp() - MsgAddRoute(app) - app.Run(fmt.Sprintf(":%d", MsgBusiPort)) +func MsgSetup(app *gin.Engine) { } func MsgFireRequest() { @@ -36,47 +27,11 @@ func MsgFireRequest() { TransOutResult: "SUCCESS", } msg := dtm.MsgNew(DtmServer). - Add(MsgBusi+"/TransOut", req). - Add(MsgBusi+"/TransIn", req) - err := msg.Prepare(MsgBusi + "/TransQuery") + Add(Busi+"/TransOut", req). + Add(Busi+"/TransIn", req) + err := msg.Prepare(Busi + "/TransQuery") e2p(err) logrus.Printf("busi trans commit") err = msg.Commit() e2p(err) } - -// api - -func MsgAddRoute(app *gin.Engine) { - app.POST(MsgBusiApi+"/TransIn", common.WrapHandler(msgTransIn)) - app.POST(MsgBusiApi+"/TransOut", common.WrapHandler(MsgTransOut)) - app.GET(MsgBusiApi+"/TransQuery", common.WrapHandler(msgTransQuery)) - logrus.Printf("examples msg listening at %d", MsgBusiPort) -} - -var MsgTransInResult = "" -var MsgTransOutResult = "" -var MsgTransQueryResult = "" - -func msgTransIn(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(MsgTransInResult, req.TransInResult, "SUCCESS") - logrus.Printf("%s TransIn: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func MsgTransOut(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(MsgTransOutResult, req.TransOutResult, "SUCCESS") - logrus.Printf("%s TransOut: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func msgTransQuery(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - logrus.Printf("%s TransQuery", gid) - res := common.OrString(MsgTransQueryResult, "SUCCESS") - return M{"result": res}, nil -} diff --git a/examples/main_saga.go b/examples/main_saga.go index 9de536c..3d1569d 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -1,92 +1,38 @@ package examples import ( - "fmt" "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm" - "github.com/yedf/dtm/common" ) -// 事务参与者的服务地址 -const SagaBusiApi = "/api/busi_saga" - -var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi) - func SagaMain() { - go SagaStartSvr() - SagaFireRequest() + app := BaseAppNew() + BaseAppSetup(app) + TccSetup(app) + go BaseAppStart(app) + time.Sleep(100 * time.Millisecond) + TccFireRequest() time.Sleep(1000 * time.Second) } -func SagaStartSvr() { - logrus.Printf("saga examples starting") - app := common.GetGinApp() - SagaAddRoute(app) - app.Run(fmt.Sprintf(":%d", SagaBusiPort)) +func SagaSetup(app *gin.Engine) { } func SagaFireRequest() { - logrus.Printf("a busi transaction begin") + logrus.Printf("a saga busi transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } saga := dtm.SagaNew(DtmServer). - Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req). - Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) - logrus.Printf("busi trans commit") + Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). + Add(Busi+"/TransIn", Busi+"/TransInRevert", req) + logrus.Printf("saga busi trans commit") err := saga.Commit() logrus.Printf("result gid is: %s", saga.Gid) e2p(err) } - -// api - -func SagaAddRoute(app *gin.Engine) { - app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(sagaTransIn)) - app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(sagaTransInCompensate)) - app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(SagaTransOut)) - app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(sagaTransOutCompensate)) - logrus.Printf("examples listening at %d", SagaBusiPort) -} - -var SagaTransInResult = "" -var SagaTransOutResult = "" -var SagaTransInCompensateResult = "" -var SagaTransOutCompensateResult = "" - -func sagaTransIn(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(SagaTransInResult, req.TransInResult, "SUCCESS") - logrus.Printf("%s TransIn: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func sagaTransInCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(SagaTransInCompensateResult, "SUCCESS") - logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func SagaTransOut(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(SagaTransOutResult, req.TransOutResult, "SUCCESS") - logrus.Printf("%s TransOut: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func sagaTransOutCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(SagaTransOutCompensateResult, "SUCCESS") - logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go new file mode 100644 index 0000000..4653e80 --- /dev/null +++ b/examples/main_saga_barrier.go @@ -0,0 +1,101 @@ +package examples + +import ( + "database/sql" + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" + "gorm.io/gorm" +) + +// 事务参与者的服务地址 +const SagaBarrierBusiApi = "/api/busi_saga_barrier" + +var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi) + +func SagaBarrierMain() { + go SagaBarrierStartSvr() + SagaBarrierFireRequest() + time.Sleep(1000 * time.Second) +} + +func SagaBarrierStartSvr() { + logrus.Printf("saga barrier examples starting") + app := common.GetGinApp() + SagaBarrierAddRoute(app) + app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort)) +} + +func SagaBarrierFireRequest() { + logrus.Printf("a busi transaction begin") + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + saga := dtm.SagaNew(DtmServer). + Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req). + Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req) + logrus.Printf("busi trans commit") + err := saga.Commit() + e2p(err) +} + +// api + +func SagaBarrierAddRoute(app *gin.Engine) { + app.POST(SagaBarrierBusiApi+"/TransIn", common.WrapHandler(sagaBarrierTransIn)) + app.POST(SagaBarrierBusiApi+"/TransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate)) + app.POST(SagaBarrierBusiApi+"/TransOut", common.WrapHandler(sagaBarrierTransOut)) + app.POST(SagaBarrierBusiApi+"/TransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) + logrus.Printf("examples listening at %d", SagaBarrierBusiPort) +} + +var SagaBarrierTransInResult = "" +var SagaBarrierTransOutResult = "" +var SagaBarrierTransInCompensateResult = "" +var SagaBarrierTransOutCompensateResult = "" + +func sagaBarrierTransIn(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := reqFrom(c) + res := common.OrString(SagaBarrierTransInResult, req.TransInResult, "SUCCESS") + logrus.Printf("%s TransIn: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := reqFrom(c) + res := common.OrString(SagaBarrierTransInCompensateResult, "SUCCESS") + logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + lid := c.Query("lid") + req := reqFrom(c) + return dtm.ThroughBarrierCall(dbGet().ToSqlDB(), "saga", gid, lid, "action", func(sdb *sql.DB) (interface{}, error) { + db := common.SqlDB2DB(sdb) + dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). + Update("balance", gorm.Expr("balance - ?", req.Amount)) + return nil, dbr.Error + }) + + // res := common.OrString(SagaBarrierTransOutResult, req.TransOutResult, "SUCCESS") + // logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + // return M{"result": res}, nil +} + +func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := reqFrom(c) + res := common.OrString(SagaBarrierTransOutCompensateResult, "SUCCESS") + logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 119f3b9..d329259 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -1,111 +1,37 @@ package examples import ( - "fmt" "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm" - "github.com/yedf/dtm/common" ) -// 事务参与者的服务地址 -const TccBusiApi = "/api/busi_tcc" - -var TccBusi = fmt.Sprintf("http://localhost:%d%s", TccBusiPort, TccBusiApi) - func TccMain() { - go TccStartSvr() + app := BaseAppNew() + BaseAppSetup(app) + TccSetup(app) + go BaseAppStart(app) + time.Sleep(100 * time.Millisecond) TccFireRequest() time.Sleep(1000 * time.Second) } -func TccStartSvr() { - logrus.Printf("tcc examples starting") - app := common.GetGinApp() - TccAddRoute(app) - app.Run(fmt.Sprintf(":%d", TccBusiPort)) +func TccSetup(app *gin.Engine) { } func TccFireRequest() { - logrus.Printf("a busi transaction begin") + logrus.Printf("tcc transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } tcc := dtm.TccNew(DtmServer). - Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req). - Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) - logrus.Printf("busi trans commit") + Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req). + Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req) + logrus.Printf("tcc trans commit") err := tcc.Commit() e2p(err) } - -// api - -func TccAddRoute(app *gin.Engine) { - app.POST(TccBusiApi+"/TransInTry", common.WrapHandler(tccTransInTry)) - app.POST(TccBusiApi+"/TransInConfirm", common.WrapHandler(tccTransInConfirm)) - app.POST(TccBusiApi+"/TransInCancel", common.WrapHandler(tccTransCancel)) - app.POST(TccBusiApi+"/TransOutTry", common.WrapHandler(tccTransOutTry)) - app.POST(TccBusiApi+"/TransOutConfirm", common.WrapHandler(tccTransOutConfirm)) - app.POST(TccBusiApi+"/TransOutCancel", common.WrapHandler(tccTransOutCancel)) - logrus.Printf("examples listening at %d", TccBusiPort) -} - -var TccTransInTryResult = "" -var TccTransOutTryResult = "" -var TccTransInCancelResult = "" -var TccTransOutCancelResult = "" -var TccTransInConfirmResult = "" -var TccTransOutConfirmResult = "" - -func tccTransInTry(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransInTryResult, req.TransInResult, "SUCCESS") - logrus.Printf("%s TransInTry: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func tccTransInConfirm(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransInConfirmResult, "SUCCESS") - logrus.Printf("%s tccTransInConfirm: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func tccTransCancel(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransInCancelResult, "SUCCESS") - logrus.Printf("%s tccTransCancel: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func tccTransOutTry(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransOutTryResult, req.TransOutResult, "SUCCESS") - logrus.Printf("%s TransOut: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func tccTransOutConfirm(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransOutConfirmResult, "SUCCESS") - logrus.Printf("%s TransOutConfirm: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func tccTransOutCancel(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := transReqFromContext(c) - res := common.OrString(TccTransOutCancelResult, "SUCCESS") - logrus.Printf("%s tccTransOutCancel: %v result: %s", gid, req, res) - return M{"result": res}, nil -} diff --git a/examples/main_xa.go b/examples/main_xa.go index 29c7eee..608b87a 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -5,17 +5,11 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" "github.com/yedf/dtm" "github.com/yedf/dtm/common" "gorm.io/gorm" ) -// 事务参与者的服务地址 -const XaBusiApi = "/api/busi_xa" - -var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi) - var XaClient *dtm.XaClient = nil type UserAccount struct { @@ -31,22 +25,14 @@ func dbGet() *common.DB { } func XaMain() { - go XaStartSvr() + app := BaseAppNew() + XaSetup(app) + go BaseAppStart(app) time.Sleep(100 * time.Millisecond) XaFireRequest() time.Sleep(1000 * time.Second) } -func XaStartSvr() { - common.InitApp(common.GetProjectDir(), &Config) - Config.Mysql["database"] = "dtm_busi" - logrus.Printf("xa examples starting") - app := common.GetGinApp() - XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa") - XaAddRoute(app) - app.Run(fmt.Sprintf(":%d", XaBusiPort)) -} - func XaFireRequest() { gid := common.GenGid() err := XaClient.XaGlobalTransaction(gid, func() (rerr error) { @@ -55,12 +41,12 @@ func XaFireRequest() { resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "1", - }).Post(XaBusi + "/TransOut") + }).Post(Busi + "/TransOutXa") common.CheckRestySuccess(resp, err) resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, "user_id": "2", - }).Post(XaBusi + "/TransIn") + }).Post(Busi + "/TransInXa") common.CheckRestySuccess(resp, err) return nil }) @@ -68,14 +54,16 @@ func XaFireRequest() { } // api -func XaAddRoute(app *gin.Engine) { - app.POST(XaBusiApi+"/TransIn", common.WrapHandler(xaTransIn)) - app.POST(XaBusiApi+"/TransOut", common.WrapHandler(xaTransOut)) +func XaSetup(app *gin.Engine) { + app.POST(BusiApi+"/TransInXa", common.WrapHandler(xaTransIn)) + app.POST(BusiApi+"/TransOutXa", common.WrapHandler(xaTransOut)) + Config.Mysql["database"] = "dtm_busi" + XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, Busi+"/xa") } func xaTransIn(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) { - req := transReqFromContext(c) + req := reqFrom(c) if req.TransInResult != "SUCCESS" { return fmt.Errorf("tranIn failed") } @@ -89,7 +77,7 @@ func xaTransIn(c *gin.Context) (interface{}, error) { func xaTransOut(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) { - req := transReqFromContext(c) + req := reqFrom(c) if req.TransOutResult != "SUCCESS" { return fmt.Errorf("tranOut failed") } diff --git a/examples/quick_start.go b/examples/quick_start.go index 59d2ef3..aa5c587 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -11,52 +11,45 @@ import ( ) // 事务参与者的服务地址 -const startBusiApi = "/api/busi_start" +const qsBusiApi = "/api/busi_start" +const qsBusiPort = 8082 -var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi) +var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiApi) -func startMain() { - go startStartSvr() - startFireRequest() +func StartMain() { + go qsStartSvr() + qsFireRequest() time.Sleep(1000 * time.Second) } -func startStartSvr() { - logrus.Printf("saga examples starting") +func qsStartSvr() { + logrus.Printf("quick start examples starting") app := common.GetGinApp() - startAddRoute(app) - app.Run(fmt.Sprintf(":%d", SagaBusiPort)) + qsAddRoute(app) + app.Run(fmt.Sprintf(":%d", qsBusiPort)) } -func startFireRequest() { +func qsFireRequest() { req := &gin.H{"amount": 30} saga := dtm.SagaNew(DtmServer). - Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). - Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) + Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). + Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) err := saga.Commit() e2p(err) } -func startAddRoute(app *gin.Engine) { - app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(startTransIn)) - app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(startTransInCompensate)) - app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(startTransOut)) - app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(startTransOutCompensate)) - logrus.Printf("examples listening at %d", startBusiPort) -} - -func startTransIn(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil -} - -func startTransInCompensate(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil -} - -func startTransOut(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil -} - -func startTransOutCompensate(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil +func qsAddRoute(app *gin.Engine) { + app.POST(qsBusiApi+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil + })) + app.POST(qsBusiApi+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil + })) + app.POST(qsBusiApi+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil + })) + app.POST(qsBusiApi+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil + })) + logrus.Printf("quick qs examples listening at %d", qsBusiPort) } diff --git a/examples/types.go b/examples/types.go index f282f2a..4553089 100644 --- a/examples/types.go +++ b/examples/types.go @@ -1,7 +1,10 @@ package examples import ( + "fmt" + "github.com/gin-gonic/gin" + "github.com/yedf/dtm" "github.com/yedf/dtm/common" ) @@ -13,13 +16,7 @@ type M = map[string]interface{} const DtmServer = "http://localhost:8080/api/dtmsvr" const ( - MsgBusiPort = iota + 8081 - SagaBusiPort - SagaBarrierBusiPort - TccBusiPort - TccBarrierBusiPort - XaBusiPort - startBusiPort + SagaBarrierBusiPort = iota + 8090 ) type TransReq struct { @@ -28,6 +25,10 @@ type TransReq struct { TransOutResult string `json:"transOutResult"` } +func (t *TransReq) String() string { + return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult) +} + func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { return &TransReq{ Amount: amount, @@ -36,9 +37,19 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { } } -func transReqFromContext(c *gin.Context) *TransReq { +func reqFrom(c *gin.Context) *TransReq { req := TransReq{} err := c.BindJSON(&req) e2p(err) return &req } + +func infoFromContext(c *gin.Context) *dtm.TransInfo { + info := dtm.TransInfo{ + TransType: c.Query("trans_type"), + Gid: c.Query("gid"), + BranchID: c.Query("branch_id"), + BranchType: c.Query("branch_type"), + } + return &info +} diff --git a/go.mod b/go.mod index bd0673c..a15bf34 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/json-iterator/go v1.1.10 github.com/sirupsen/logrus v1.7.0 github.com/spf13/viper v1.7.1 - github.com/stretchr/testify v1.7.0 // indirect + github.com/stretchr/testify v1.7.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect gorm.io/driver/mysql v1.0.3 diff --git a/intra-barrier.md b/intra-barrier.md new file mode 100644 index 0000000..3c77f8b --- /dev/null +++ b/intra-barrier.md @@ -0,0 +1,134 @@ +# 子事务屏障--彻底解决子事务乱序执行问题 +### 子事务乱序问题 +我们拿分布式事务中的TCC作为例子,看看如何子事务乱序是什么样的,如何解决。 +假设一个分布式TCC事务G,包含子事务A和B,全部成功为 +A-Try ok -> B-Try ok -> G完成 +当网络出现问题时,下面是某一种常见情形 +A-Try 网络丢包导致超时 -> A Cancel ok丢包 -> A Try 请求到达子事务进行处理(此时因为Cancel已成功,需要忽略,立即返回)->A Cancel ok +这种情况下: +A Cancel在Try之前执行,称为空补偿问题,此时Cancel的处理,需要进行空补偿,称为空补偿控制 +A 第二次Cancel,称为幂等问题,此时Cancel的处理,需要判断此前已回滚,此次忽略,成为幂等控制 +A 第二次Try时,之前已进行了Cancel,此次之行应当忽略,我们称之为防悬挂控制 +这几种情况的正确处理,通常需要子事务做精细处理,例如记录业务处理的主键,并保证上述描述的逻辑。 + +子事务屏障提供了一套全新方法,让业务编写者完全不用被上述问题困扰,他的工作方式如下: + +子事务进入屏障,在屏障中编写自己的业务逻辑,由屏障保证内部Try-Confirm-Cancel逻辑只被提交一次 + +子事务屏障的原理如下: +它使用表sub_trans_barrier,主键为全局事务id-子事务id-子事务分支名称(try|confirm|cancel) +。开启事务 +。如果是Try分支,则那么insert ignore插入gid-branchid-try,如果成功插入,则调用屏障内逻辑 +. 如果是Confirm分支,那么insert ignore插入gid-branchid-confirm,如果成功插入,则调用屏障内逻辑 +. 如果是Cancel分支,那么insert ignore插入gid-branchid-try,再插入gid-branchid-cancel,如果try未插入并且cancel插入成功,则调用屏障内逻辑 +屏障内如果调用成功,提交事务,返回成功 +如果调用异常,回滚事务,返回异常 + +在此机制下,解决了乱序问题 +空补偿控制--如果Try没有执行,直接执行了Cancel,那么Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制 +幂等控制--任何一个分支都无法重复插入唯一键,保证了不会重复执行 +防悬挂控制--Try在Cancel之后执行,那么插入的gid-branchid-try不成功,就不执行,保证了防悬挂控制 + +通过子事务屏障,完全解决了子事务乱序问题,业务人员可以只关心自己的业务逻辑 + + +某些业务要求,一系列操作必须全部执行,而不能仅执行一部分。例如,一个转账操作: + +``` +-- 从id=1的账户给id=2的账户转账100元 +-- 第一步:将id=1的A账户余额减去100 +UPDATE accounts SET balance = balance - 100 WHERE id = 1; +-- 第二步:将id=2的B账户余额加上100 +UPDATE accounts SET balance = balance + 100 WHERE id = 2; +``` +这两条SQL语句必须全部执行,或者,由于某些原因,如果第一条语句成功,第二条语句失败,就必须全部撤销。 + +这种把多条语句作为一个整体进行操作的功能,被称为数据库事务。数据库事务可以确保该事务范围内的所有操作都可以全部成功或者全部失败。如果事务失败,那么效果就和没有执行这些SQL一样,不会对数据库数据有任何改动。 + +[更多事务介绍](https://www.liaoxuefeng.com/wiki/1177760294764384/1179611198786848) + + +### 微服务 + +如果一个事务涉及的所有操作能够放在一个服务内部,那么使用各门语言里事务相关的库,可以轻松的实现多个操作作为整体的事务操作。 + +但是有些服务,例如生成订单涉及做很多操作,包括库存、优惠券、赠送、账户余额等。当系统复杂程度增加时,想要把所有这些操作放到一个服务内实现,会导致耦合度太高,维护成本非常高。 + +针对复杂的系统,当前流行的微服务架构是非常好的解决方案,该架构能够把复杂系统进行拆分,拆分后形成了大量微服务,独立开发,独立维护。 + +[更多微服务介绍](https://www.zhihu.com/question/65502802) + +虽然服务拆分了,但是订单本身的逻辑需要多个操作作为一个整体,要么全部成功,要么全部失败,这就带来了新的挑战。如何把散落在各个微服务中的本地事务,组成一个大的事务,保证他们作为一个整体,这就是分布式事务需要解决的问题。 + +### 分布式事务 +分布式事务简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。 + +[更多分布式事务介绍](https://juejin.cn/post/6844903647197806605) + +分布式事务方案包括: + * xa + * tcc + * saga + * 可靠消息 + +下面我们看看最简单的xa + +### XA + +XA一共分为两阶段: + +第一阶段(prepare):事务管理器向所有本地资源管理器发起请求,询问是否是 ready 状态,所有参与者都将本事务能否成功的信息反馈发给协调者; + +第二阶段 (commit/rollback):事务管理器根据所有本地资源管理器的反馈,通知所有本地资源管理器,步调一致地在所有分支上提交或者回滚。 + +目前主流的数据库基本都支持XA事务,包括mysql、oracle、sqlserver、postgre + +### xa实践 + +介绍了这么多,我们来实践完成一个微服务上的xa事务,加深分布式事务的理解,这里将采用[dtm](https://github.com/yedf/dtm.git)作为示例 + +[安装go](https://golang.org/doc/install) + +[安装mysql](https://www.mysql.com/cn/) + +获取dtm +``` +git clone https://github.com/yedf/dtm.git +cd dtm +``` +配置mysql +``` +cp conf.sample.yml conf.yml +vi conf.yml +``` + +运行示例 + +``` +go run app/main.go xa +``` + +从日志里,能够找到以下输出 +``` +# 服务1输出 +XA start '4fPqCNTYeSG' +UPDATE `user_account` SET `balance`=balance + 30,`update_time`='2021-06-09 11:50:42.438' WHERE user_id = '1' +XA end '4fPqCNTYeSG' +XA prepare '4fPqCNTYeSG' + +# 服务2输出 +XA start '4fPqCPijxyC' +UPDATE `user_account` SET `balance`=balance - 30,`update_time`='2021-06-09 11:50:42.493' WHERE user_id = '2' +XA end '4fPqCPijxyC' +XA prepare '4fPqCPijxyC' + +# 服务1输出 +xa commit '4fPqCNTYeSG' + +#服务2输出 +xa commit '4fPqCPijxyC' +``` + + +### 总结 +在这篇简短的文章里,我们大致介绍了 事务->分布式事务->微服务处理XA事务。有兴趣的同学可以通过[dtm](https://github.com/yedf/dtm)继续研究分布式事务 \ No newline at end of file