From b9ef5c800585c01ad5dd6d80dae56c96dd56fdfc Mon Sep 17 00:00:00 2001 From: yedongfu Date: Wed, 21 Jul 2021 10:30:22 +0800 Subject: [PATCH] fix barrier return --- dtmcli/barrier.go | 33 ++++++++++++++++++++++++--------- dtmsvr/cron.go | 3 ++- dtmsvr/dtmsvr_test.go | 3 ++- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 87610c3..8348e23 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -46,6 +46,16 @@ type BarrierModel struct { TransInfo } +func logExec(tx *sql.Tx, query string, args ...interface{}) (sql.Result, error) { + logrus.Printf("executing: "+query, args...) + return tx.Exec(query, args...) +} + +func logQueryRow(tx *sql.Tx, query string, args ...interface{}) *sql.Row { + logrus.Printf("querying: "+query, args...) + return tx.QueryRow(query, args...) +} + // TableName gorm table name func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } @@ -53,14 +63,14 @@ func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, br if branchType == "" { return 0, nil } - res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) + res, err := logExec(tx, "insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) if err != nil { return 0, err } return res.RowsAffected() } -// ThroughBarrierCall2 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 +// ThroughBarrierCall 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 // db: 本地数据库 // transInfo: 事务信息 // bisiCall: 业务函数,仅在必要时被调用 @@ -94,10 +104,11 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType) logrus.Printf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功 - return common.MS{"dtm_result": "SUCCESS"}, nil + res = common.MS{"dtm_result": "SUCCESS"} + return } else if currentAffected == 0 { // 插入不成功 - var result string - err := tx.QueryRow("select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?", + var result sql.NullString + err := logQueryRow(tx, "select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?", ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType).Scan(&result) if err == sql.ErrNoRows { // 这个是悬挂操作,返回失败,AP收到这个返回,会尽快回滚 res = common.MS{"dtm_result": "FAILURE"} @@ -107,14 +118,18 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re rerr = err return } - // 返回上一次的结果 - rerr = json.Unmarshal([]byte(result), &res) - return + if result.Valid { // 数据库里有上一次结果,返回上一次的结果 + res = json.Unmarshal([]byte(result.String), &res) + return + } else { // 数据库里没有上次的结果,属于重复空补偿,直接返回成功 + res = common.MS{"dtm_result": "SUCCESS"} + return + } } res, rerr = busiCall(db) if rerr == nil { // 正确返回了,需要将结果保存到数据库 sval := common.MustMarshalString(res) - _, rerr = tx.Exec("update dtm_barrier.barrier set result=? where trans_type=? and gid=? and branch_id=? and branch_type=?", sval, + _, rerr = logExec(tx, "update dtm_barrier.barrier set result=? where trans_type=? and gid=? and branch_id=? and branch_type=?", sval, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType) } return diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 9941f27..14600de 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -36,8 +36,9 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { owner := GenGid() db := dbGet() // 这里next_cron_time需要限定范围,否则数据量累计之后,会导致查询变慢 + // 限定update_time < now - 3,否则会出现刚被这个应用取出,又被另一个取出 dbr := db.Must().Model(&trans). - Where("next_cron_time < date_add(now(), interval ? second) and next_cron_time > date_add(now(), interval -3600 second) and status in ('prepared', 'aborting', 'submitted')", int(expireIn/time.Second)). + Where("next_cron_time < date_add(now(), interval ? second) and next_cron_time > date_add(now(), interval -3600 second) and update_time < date_add(now(), interval ? second) and status in ('prepared', 'aborting', 'submitted')", int(expireIn/time.Second), -3+int(expireIn/time.Second)). Limit(1).Update("owner", owner) if dbr.RowsAffected == 0 { return nil diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index a1f5894..dfbca0a 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -366,7 +366,7 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - _, err := common.RestyClient.R(). + r, err := common.RestyClient.R(). SetBody(&M{ "gid": tcc.Gid, "branch_id": branchID, @@ -379,6 +379,7 @@ func tccBarrierDisorder(t *testing.T) { }). Post(tcc.Dtm + "/registerTccBranch") e2p(err) + assert.True(t, strings.Contains(r.String(), "SUCCESS")) go func() { logrus.Printf("sleeping to wait for tcc try timeout") <-timeoutChan