diff --git a/app/main.go b/app/main.go index 1e28dd3..09cafb1 100644 --- a/app/main.go +++ b/app/main.go @@ -5,6 +5,7 @@ import ( "time" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) @@ -19,6 +20,11 @@ func wait() { } func main() { + var a, b interface{} + common.MustUnmarshalString("{\"a\": 1}", &a) + common.MustUnmarshalString("[1, 2]", &b) + logrus.Printf("a is: %v, b is: %v", a, b) + os.Exit(0) if len(os.Args) > 1 && os.Args[1] == "dtmsvr" { // 实际运行,只启动dtmsvr,不重新load数据 dtmsvr.MainStart() wait() diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index eaef70c..87610c3 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -3,9 +3,11 @@ package dtmcli import ( "context" "database/sql" + "encoding/json" "fmt" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -47,24 +49,33 @@ type BarrierModel struct { // TableName gorm table name func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } -func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { +func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string, reason string) (int64, error) { if branchType == "" { return 0, nil } - res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType) + res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) if err != nil { return 0, err } return res.RowsAffected() } -// ThroughBarrierCall barrier interface. busiCall will be called only when the request is necessary +// ThroughBarrierCall2 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 +// db: 本地数据库 +// transInfo: 事务信息 +// bisiCall: 业务函数,仅在必要时被调用 +// 返回值: +// 如果正常调用,返回bisiCall的结果 +// 如果发生重复调用,则busiCall不会被重复调用,直接对保存在数据库中上一次的结果,进行unmarshal,通常是一个map[string]interface{},直接作为http的resp +// 如果发生悬挂,则busiCall不会被调用,直接返回错误 {"dtm_result": "FAILURE"} +// 如果发生空补偿,则busiCall不会被调用,直接返回 {"dtm_result": "SUCCESS"} func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (res interface{}, rerr error) { tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) if rerr != nil { return } defer func() { + logrus.Printf("result is %v error is %v", res, rerr) if x := recover(); x != nil { tx.Rollback() panic(x) @@ -74,17 +85,37 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re tx.Commit() } }() - + ti := transInfo originType := map[string]string{ - "cancel": "action", + "cancel": "try", "compensate": "action", - }[transInfo.BranchType] - originAffected, _ := insertBarrier(tx, transInfo.TransType, transInfo.Gid, transInfo.BranchID, originType) - currentAffected, rerr := insertBarrier(tx, transInfo.TransType, transInfo.Gid, transInfo.BranchID, transInfo.BranchType) - if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 { - res = "SUCCESS" // 如果被忽略,那么直接返回 "SUCCESS",表示成功,可以进行下一步 + }[ti.BranchType] + originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, ti.BranchType) + currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType) + logrus.Printf("originAffected: %d currentAffected: %d", originAffected, currentAffected) + if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功 + return common.MS{"dtm_result": "SUCCESS"}, nil + } else if currentAffected == 0 { // 插入不成功 + var result string + err := tx.QueryRow("select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?", + ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType).Scan(&result) + if err == sql.ErrNoRows { // 这个是悬挂操作,返回失败,AP收到这个返回,会尽快回滚 + res = common.MS{"dtm_result": "FAILURE"} + return + } + if err != nil { + rerr = err + return + } + // 返回上一次的结果 + rerr = json.Unmarshal([]byte(result), &res) return } res, rerr = busiCall(db) + if rerr == nil { // 正确返回了,需要将结果保存到数据库 + sval := common.MustMarshalString(res) + _, rerr = tx.Exec("update dtm_barrier.barrier set result=? where trans_type=? and gid=? and branch_id=? and branch_type=?", sval, + ti.TransType, ti.Gid, ti.BranchID, ti.BranchType) + } return } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 537c078..591c616 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -86,7 +86,7 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca } else { panic(fmt.Errorf("unknown action: %s", req.Action)) } - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) return xa } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index e9728be..4a7bcb9 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -165,12 +165,12 @@ func tccBarrierRollback(t *testing.T) { if res1.StatusCode() != 200 { return fmt.Errorf("bad status code: %d", res1.StatusCode()) } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAIL"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") e2p(rerr) if res2.StatusCode() != 200 { return fmt.Errorf("bad status code: %d", res2.StatusCode()) } - if strings.Contains(res2.String(), "FAIL") { + if strings.Contains(res2.String(), "FAILURE") { return fmt.Errorf("branch trans in fail") } logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) @@ -182,7 +182,7 @@ func tccBarrierRollback(t *testing.T) { } func tccRollback(t *testing.T) { - data := &examples.TransReq{Amount: 30, TransInResult: "FAIL"} + data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") e2p(rerr) @@ -248,7 +248,7 @@ func sagaRollback(t *testing.T) { func sagaBarrierRollback(t *testing.T) { saga := dtmcli.NewSaga(DtmServer). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). - Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAIL"}) + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) logrus.Printf("busi trans submit") err := saga.Submit() e2p(err) @@ -321,9 +321,9 @@ func TestSqlDB(t *testing.T) { TransType: "saga", Gid: "gid2", BranchID: "branch_id2", - BranchType: "compensate", + BranchType: "action", } - db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values('saga', 'gid1', 'branch_id1', 'action')") + db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')") _, err := dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) { logrus.Printf("rollback gid2") return nil, fmt.Errorf("gid2 error") @@ -339,5 +339,5 @@ func TestSqlDB(t *testing.T) { }) asserts.Nil(err) dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) - asserts.Equal(dbr.RowsAffected, int64(2)) + asserts.Equal(dbr.RowsAffected, int64(1)) } diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 3865f59..bc9e641 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -42,7 +42,7 @@ func (t *transSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { if strings.Contains(body, "SUCCESS") { t.touch(db, config.TransCronInterval) branch.changeStatus(db, "succeed") - } else if branch.BranchType == "action" && strings.Contains(body, "FAIL") { + } else if branch.BranchType == "action" && strings.Contains(body, "FAILURE") { t.touch(db, config.TransCronInterval) branch.changeStatus(db, "failed") } else { diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 61929cd..9df472b 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -26,7 +26,7 @@ func (t *transTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { if strings.Contains(body, "SUCCESS") { t.touch(db, config.TransCronInterval) branch.changeStatus(db, "succeed") - } else if branch.BranchType == "try" && strings.Contains(body, "FAIL") { + } else if branch.BranchType == "try" && strings.Contains(body, "FAILURE") { t.touch(db, config.TransCronInterval) branch.changeStatus(db, "failed") } else { diff --git a/examples/examples.sql b/examples/examples.sql index f9ad174..70be29a 100644 --- a/examples/examples.sql +++ b/examples/examples.sql @@ -36,6 +36,8 @@ create table if not exists barrier( gid varchar(128) default'', branch_id varchar(128) default '', branch_type varchar(45) default '', + reason varchar(45) default '' comment 'the branch type who insert this record', + result varchar(2047) default null comment 'the business result of this branch', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), diff --git a/examples/main_base.go b/examples/main_base.go index 599cb38..4cea49c 100644 --- a/examples/main_base.go +++ b/examples/main_base.go @@ -64,7 +64,7 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi info := infoFromContext(c) res := common.OrString(MainSwitch.TransInResult.Fetch(), result2, "SUCCESS") logrus.Printf("%s %s result: %s", info.String(), common.GetFuncName(), res) - return M{"result": res}, nil + return M{"dtm_result": res}, nil } diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 54d452d..90e9725 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -35,7 +35,7 @@ func SagaBarrierAddRoute(app *gin.Engine) { func sagaBarrierAdjustBalance(sdb *sql.DB, uid int, amount int) (interface{}, error) { db := common.SQLDB2DB(sdb) dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).Update("balance", gorm.Expr("balance + ?", amount)) - return "SUCCESS", dbr.Error + return common.MS{"dtm_result": "SUCCESS"}, dbr.Error } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 2352650..f13368f 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -21,7 +21,7 @@ func TccSetup(app *gin.Engine) { return nil, rerr } - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) } diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index 8e29935..f02191b 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -55,7 +55,7 @@ func adjustTrading(sdb *sql.DB, uid int, amount int) (interface{}, error) { if dbr.Error == nil && dbr.RowsAffected == 0 { return nil, fmt.Errorf("update error, maybe balance not enough") } - return "SUCCESS", nil + return common.MS{"dtm_server": "SUCCESS"}, nil } func adjustBalance(sdb *sql.DB, uid int, amount int) (interface{}, error) { @@ -70,12 +70,12 @@ func adjustBalance(sdb *sql.DB, uid int, amount int) (interface{}, error) { if dbr.RowsAffected == 0 { return nil, fmt.Errorf("update 0 rows") } - return "SUCCESS", nil + return common.MS{"dtm_result": "SUCCESS"}, nil } // TCC下,转入 func tccBarrierTransInTry(c *gin.Context) (interface{}, error) { - req := reqFrom(c) + req := reqFrom(c) // 去重构一下,改成可以重复使用的输入 if req.TransInResult != "" { return req.TransInResult, nil } diff --git a/examples/main_xa.go b/examples/main_xa.go index af9e87e..30e62a6 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -69,7 +69,7 @@ func xaTransIn(c *gin.Context) (interface{}, error) { return dbr.Error }) e2p(err) - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil } func xaTransOut(c *gin.Context) (interface{}, error) { @@ -84,7 +84,7 @@ func xaTransOut(c *gin.Context) (interface{}, error) { return dbr.Error }) e2p(err) - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil } // ResetXaData 1 diff --git a/examples/quick_start.go b/examples/quick_start.go index 335b6bc..5bf3815 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -44,15 +44,15 @@ func QsFireRequest() string { func qsAddRoute(app *gin.Engine) { app.POST(qsBusiAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) app.POST(qsBusiAPI+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) app.POST(qsBusiAPI+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) app.POST(qsBusiAPI+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return M{"result": "SUCCESS"}, nil + return M{"dtm_result": "SUCCESS"}, nil })) } diff --git a/examples/types.go b/examples/types.go index 7e42bc8..fe7562d 100644 --- a/examples/types.go +++ b/examples/types.go @@ -31,8 +31,8 @@ func (t *TransReq) String() string { func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { return &TransReq{ Amount: amount, - TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string), - TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string), + TransOutResult: common.If(outFailed, "FAILURE", "SUCCESS").(string), + TransInResult: common.If(inFailed, "FAILURE", "SUCCESS").(string), } } diff --git a/intro-xa.md b/intro-xa.md deleted file mode 100644 index 3900d5c..0000000 --- a/intro-xa.md +++ /dev/null @@ -1,125 +0,0 @@ -# 分布式事务深入浅出 -### 事务 -某些业务要求,一系列操作必须全部执行,而不能仅执行一部分。例如,一个转账操作: - -``` --- 从id=1的账户给id=2的账户转账100元 --- 第一步:将id=1的A账户余额减去100 -UPDATE accounts SET balance = balance - 100 WHERE id = 1; --- 第二步:将id=2的B账户余额加上100 -UPDATE accounts SET balance = balance + 100 WHERE id = 2; -``` -这两条SQL语句必须全部执行,或者,由于某些原因,如果第一条语句成功,第二条语句失败,就必须全部撤销。 - -这种把多条语句作为一个整体进行操作的功能,被称为数据库事务。数据库事务可以确保该事务范围内的所有操作都可以全部成功或者全部失败。如果事务失败,那么效果就和没有执行这些SQL一样,不会对数据库数据有任何改动。 - -[更多事务介绍](https://www.liaoxuefeng.com/wiki/1177760294764384/1179611198786848) - - -### 微服务 - -如果一个事务涉及的所有操作能够放在一个服务内部,那么使用各门语言里事务相关的库,可以轻松的实现多个操作作为整体的事务操作。 - -但是有些服务,例如生成订单涉及做很多操作,包括库存、优惠券、赠送、账户余额等。当系统复杂程度增加时,想要把所有这些操作放到一个服务内实现,会导致耦合度太高,维护成本非常高。 - -针对复杂的系统,当前流行的微服务架构是非常好的解决方案,该架构能够把复杂系统进行拆分,拆分后形成了大量微服务,独立开发,独立维护。 - -[更多微服务介绍](https://www.zhihu.com/question/65502802) - -虽然服务拆分了,但是订单本身的逻辑需要多个操作作为一个整体,要么全部成功,要么全部失败,这就带来了新的挑战。如何把散落在各个微服务中的本地事务,组成一个大的事务,保证他们作为一个整体,这就是分布式事务需要解决的问题。 - -### 分布式事务 -分布式事务简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。 - -[更多分布式事务介绍](https://juejin.cn/post/6844903647197806605) - -分布式事务方案包括: - * xa - * tcc - * saga - * 可靠消息 - -下面我们看看最简单的xa - -### XA - -XA是由X/Open组织提出的分布式事务的规范,XA规范主要定义了(全局)事务管理器(TM)和(局部)资源管理器(RM)之间的接口。本地的数据库如mysql在XA中扮演的是RM角色 - -XA一共分为两阶段: - -第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。 - -第二阶段 (commit/rollback):当事务管理者(TM)确认所有参与者(RM)都ready后,向所有参与者发送commit命令。 - -目前主流的数据库基本都支持XA事务,包括mysql、oracle、sqlserver、postgre - -我们看看本地数据库是如何支持XA的: - -第一阶段 准备 -``` -XA start '4fPqCNTYeSG' -UPDATE `user_account` SET `balance`=balance + 30,`update_time`='2021-06-09 11:50:42.438' WHERE user_id = '1' -XA end '4fPqCNTYeSG' -XA prepare '4fPqCNTYeSG' -``` - -当所有的参与者完成了prepare,就进入第二阶段 提交 - -``` -xa commit '4fPqCNTYeSG' -``` - -### xa实践 - -介绍了这么多,我们来实践完成一个微服务上的xa事务,加深分布式事务的理解,这里将采用[dtm](https://github.com/yedf/dtm.git)作为示例 - -[安装go](https://golang.org/doc/install) - -[安装mysql](https://www.mysql.com/cn/) - -获取dtm -``` -git clone https://github.com/yedf/dtm.git -cd dtm -``` -配置mysql -``` -cp conf.sample.yml conf.yml -vi conf.yml -``` - -运行示例 - -``` -go run app/main.go xa -``` - -从日志里,能够找到以下输出 -``` -# 服务1输出 -XA start '4fPqCNTYeSG' -UPDATE `user_account` SET `balance`=balance + 30,`update_time`='2021-06-09 11:50:42.438' WHERE user_id = '1' -XA end '4fPqCNTYeSG' -XA prepare '4fPqCNTYeSG' - -# 服务2输出 -XA start '4fPqCPijxyC' -UPDATE `user_account` SET `balance`=balance - 30,`update_time`='2021-06-09 11:50:42.493' WHERE user_id = '2' -XA end '4fPqCPijxyC' -XA prepare '4fPqCPijxyC' - -# 服务1输出 -xa commit '4fPqCNTYeSG' - -#服务2输出 -xa commit '4fPqCPijxyC' -``` - -整个交互的时序详情如下 - - - -### 总结 -至此,一个完整的xa分布式事务介绍完成。 - -在这篇简短的文章里,我们大致介绍了 事务->分布式事务->微服务处理XA事务。有兴趣的同学可以通过[dtm](https://github.com/yedf/dtm)继续研究分布式事务 \ No newline at end of file