dtm/dtmcli/xa_base.go
hitzhangjie cc66ceb484 refactor-*: improve code readability
- use clearer method names, comments;
- define dtm usage info;
- determine action by switch-case instead of if-else, because only
  os.Args[1] needs to be checked;
- use time layout '2006-01-02 15:04:05.999' to format milliseconds;
- change filenames dtmsvr/main.go to dtmsvr/dtmsvr.go;
- simplify code;
- use `()` to think less about operator precedence;

xxx
2021-09-23 23:05:42 +08:00

82 lines
2.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dtmcli
import (
"database/sql"
"fmt"
"strings"
)
// XaClientBase XaClient/XaGrpcClient base
type XaClientBase struct {
Server string
Conf map[string]string
NotifyURL string
}
// HandleCallback 处理commit/rollback的回调
func (xc *XaClientBase) HandleCallback(gid string, branchID string, action string) error {
db, err := StandaloneDB(xc.Conf)
if err != nil {
return err
}
defer db.Close()
xaID := gid + "-" + branchID
_, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
if err != nil && strings.Contains(err.Error(), "Error 1397: XAER_NOTA") { // 重复commit/rollback同一个id报这个错误忽略
err = nil
}
return err
}
// HandleLocalTrans http/grpc 处理LocalTransaction的公共方法
func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) (interface{}, error)) (ret interface{}, rerr error) {
branchID := xa.NewBranchID()
xaBranch := xa.Gid + "-" + branchID
db, rerr := StandaloneDB(xc.Conf)
if rerr != nil {
return
}
defer func() { db.Close() }()
defer func() {
x := recover()
_, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
if x == nil && rerr == nil && err == nil {
_, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
}
if rerr == nil {
rerr = err
}
if x != nil {
panic(x)
}
}()
_, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch))
if rerr != nil {
return
}
ret, rerr = cb(db)
return
}
// HandleGlobalTrans http/grpc GlobalTransaction的公共方法
func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) {
rerr = callDtm("prepare")
if rerr != nil {
return
}
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题
defer func() {
x := recover()
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
err := callDtm(operation)
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err
}
if x != nil {
panic(x)
}
}()
rerr = callBusi()
return
}