dtm/dtmcli/barrier.go
2021-08-12 18:00:08 +08:00

101 lines
3.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dtmcli
import (
"context"
"database/sql"
"fmt"
"net/url"
)
// BusiFunc type for busi func
type BusiFunc func(db *sql.Tx) error
// BranchBarrier every branch info
type BranchBarrier struct {
TransType string
Gid string
BranchID string
BranchType string
BarrierID int
}
func (bb *BranchBarrier) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", bb.TransType, bb.Gid, bb.BranchID, bb.BranchType)
}
// BarrierFromQuery construct transaction info from request
func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) {
return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("branch_type"))
}
// BarrierFrom construct transaction info from request
func BarrierFrom(transType, gid, branchID, branchType string) (*BranchBarrier, error) {
ti := &BranchBarrier{
TransType: transType,
Gid: gid,
BranchID: branchID,
BranchType: branchType,
}
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" {
return nil, fmt.Errorf("invlid trans info: %v", ti)
}
return ti, nil
}
func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string, barrierID string, reason string) (int64, error) {
if branchType == "" {
return 0, nil
}
return StxExec(tx, "insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, barrier_id, reason) values(?,?,?,?,?,?)", transType, gid, branchID, branchType, barrierID, reason)
}
// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
// db: 本地数据库
// transInfo: 事务信息
// bisiCall: 业务函数,仅在必要时被调用
// 返回值:
// 如果发生悬挂则busiCall不会被调用直接返回错误 ErrFailure全局事务尽早进行回滚
// 如果正常调用重复调用空补偿返回的错误值为nil正常往下进行
func (bb *BranchBarrier) Call(db *sql.DB, busiCall BusiFunc) (rerr error) {
bb.BarrierID = bb.BarrierID + 1
bid := fmt.Sprintf("%02d", bb.BarrierID)
tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{})
if rerr != nil {
return
}
defer func() {
Logf("barrier call error is %v", rerr)
if x := recover(); x != nil {
tx.Rollback()
panic(x)
} else if rerr != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
ti := bb
originType := map[string]string{
"cancel": "try",
"compensate": "action",
}[ti.BranchType]
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.BranchType)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, bid, ti.BranchType)
Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功
return
} else if currentAffected == 0 { // 插入不成功
var result sql.NullString
err := StxQueryRow(tx, "select 1 from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and barrier_id=? and reason=?",
ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, bid, ti.BranchType).Scan(&result)
if err == sql.ErrNoRows { // 不是当前分支插入的那么是cancel插入的因此是悬挂操作返回失败AP收到这个返回会尽快回滚
rerr = ErrFailure
return
}
rerr = err //幂等和空补偿,直接返回
return
}
rerr = busiCall(tx)
return
}