82 lines
2.1 KiB
Go
82 lines
2.1 KiB
Go
package dtmcli
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"strings"
|
||
)
|
||
|
||
// XaClientBase XaClient/XaGrpcClient base
|
||
type XaClientBase struct {
|
||
Server string
|
||
Conf map[string]string
|
||
NotifyURL string
|
||
}
|
||
|
||
// HandleCallback 处理commit/rollback的回调
|
||
func (xc *XaClientBase) HandleCallback(gid string, branchID string, action string) error {
|
||
db, err := SdbAlone(xc.Conf)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer db.Close()
|
||
xaID := gid + "-" + branchID
|
||
_, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
|
||
if err != nil && strings.Contains(err.Error(), "Error 1397: XAER_NOTA") { // 重复commit/rollback同一个id,报这个错误,忽略
|
||
err = nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// HandleLocalTrans http/grpc 处理LocalTransaction的公共方法
|
||
func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) (interface{}, error)) (ret interface{}, rerr error) {
|
||
branchID := xa.NewBranchID()
|
||
xaBranch := xa.Gid + "-" + branchID
|
||
db, rerr := SdbAlone(xc.Conf)
|
||
if rerr != nil {
|
||
return
|
||
}
|
||
defer func() { db.Close() }()
|
||
defer func() {
|
||
x := recover()
|
||
_, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
|
||
if x == nil && rerr == nil && err == nil {
|
||
_, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
|
||
}
|
||
if rerr == nil {
|
||
rerr = err
|
||
}
|
||
if x != nil {
|
||
panic(x)
|
||
}
|
||
}()
|
||
_, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch))
|
||
if rerr != nil {
|
||
return
|
||
}
|
||
ret, rerr = cb(db)
|
||
return
|
||
}
|
||
|
||
// HandleGlobalTrans http/grpc GlobalTransaction的公共方法
|
||
func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) {
|
||
rerr = callDtm("prepare")
|
||
if rerr != nil {
|
||
return
|
||
}
|
||
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
||
defer func() {
|
||
x := recover()
|
||
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
|
||
err := callDtm(operation)
|
||
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
||
rerr = err
|
||
}
|
||
if x != nil {
|
||
panic(x)
|
||
}
|
||
}()
|
||
rerr = callBusi()
|
||
return
|
||
}
|