package dtmcli import ( "database/sql" "fmt" "strings" ) // XaClientBase XaClient/XaGrpcClient base type XaClientBase struct { Server string Conf map[string]string NotifyURL string } // HandleCallback 处理commit/rollback的回调 func (xc *XaClientBase) HandleCallback(gid string, branchID string, action string) error { db, err := SdbAlone(xc.Conf) if err != nil { return err } defer db.Close() xaID := gid + "-" + branchID _, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) if err != nil && strings.Contains(err.Error(), "Error 1397: XAER_NOTA") { // 重复commit/rollback同一个id,报这个错误,忽略 err = nil } return err } // HandleLocalTrans http/grpc 处理LocalTransaction的公共方法 func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) (interface{}, error)) (ret interface{}, rerr error) { branchID := xa.NewBranchID() xaBranch := xa.Gid + "-" + branchID db, rerr := SdbAlone(xc.Conf) if rerr != nil { return } defer func() { db.Close() }() defer func() { x := recover() _, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch)) if x == nil && rerr == nil && err == nil { _, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch)) } if rerr == nil { rerr = err } if x != nil { panic(x) } }() _, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch)) if rerr != nil { return } ret, rerr = cb(db) return } // HandleGlobalTrans http/grpc GlobalTransaction的公共方法 func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) { rerr = callDtm("prepare") if rerr != nil { return } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() operation := If(x != nil || rerr != nil, "abort", "submit").(string) err := callDtm(operation) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } if x != nil { panic(x) } }() rerr = callBusi() return }