From 8cbab29d414e23ccaead80a1281eb4ee964b5485 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 3 Sep 2021 19:43:09 +0800 Subject: [PATCH] xa add duplicate example and refactored --- dtmcli/types.go | 5 +++ dtmcli/xa.go | 81 ++++++++---------------------------- dtmcli/xa_base.go | 81 ++++++++++++++++++++++++++++++++++++ dtmgrpc/xa.go | 102 ++++++++++++++-------------------------------- test/xa_test.go | 19 +++++++++ 5 files changed, 154 insertions(+), 134 deletions(-) create mode 100644 dtmcli/xa_base.go diff --git a/dtmcli/types.go b/dtmcli/types.go index 8af513a..b9be198 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -37,6 +37,11 @@ func (g *IDGenerator) NewBranchID() string { panic(fmt.Errorf("total branch id is longer than 20")) } g.branchID = g.branchID + 1 + return g.CurrentBranchID() +} + +// CurrentBranchID return current branchID +func (g *IDGenerator) CurrentBranchID() string { return g.parentID + fmt.Sprintf("%02d", g.branchID) } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index bb774af..d4fc837 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -19,9 +19,7 @@ type XaRegisterCallback func(path string, xa *XaClient) // XaClient xa client type XaClient struct { - Server string - Conf map[string]string - NotifyURL string + XaClientBase } // Xa xa transaction @@ -40,11 +38,11 @@ func XaFromQuery(qs url.Values) (*Xa, error) { // NewXaClient construct a xa client func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, register XaRegisterCallback) (*XaClient, error) { - xa := &XaClient{ + xa := &XaClient{XaClientBase: XaClientBase{ Server: server, Conf: mysqlConf, NotifyURL: notifyURL, - } + }} u, err := url.Parse(notifyURL) if err != nil { return nil, err @@ -55,15 +53,7 @@ func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, r // HandleCallback 处理commit/rollback的回调 func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) { - db, err := SdbAlone(xc.Conf) - if err != nil { - return nil, err - } - defer db.Close() - xaID := gid + "-" + branchID - _, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) - return ResultSuccess, err - + return ResultSuccess, xc.XaClientBase.HandleCallback(gid, branchID, action) } // XaLocalTransaction start a xa local transaction @@ -72,62 +62,27 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i if rerr != nil { return } - xa.Dtm = xc.Server - branchID := xa.NewBranchID() - xaBranch := xa.Gid + "-" + branchID - db, rerr := SdbAlone(xc.Conf) - if rerr != nil { - return - } - defer func() { db.Close() }() - defer func() { - x := recover() - _, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch)) - if x == nil && rerr == nil && err == nil { - _, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch)) + ret, rerr = xc.HandleLocalTrans(&xa.TransBase, func(db *sql.DB) (ret interface{}, rerr error) { + ret, rerr = xaFunc(db, xa) + rerr = CheckResult(ret, rerr) + if rerr != nil { + return } - if rerr == nil { - rerr = err - } - if x != nil { - panic(x) - } - }() - _, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch)) - if rerr != nil { + rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": xa.CurrentBranchID(), "trans_type": "xa", "url": xc.XaClientBase.NotifyURL}, "registerXaBranch") return - } - ret, rerr = xaFunc(db, xa) - rerr = CheckResult(ret, rerr) - if rerr != nil { - return - } - rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch") + }) return } // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { - xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")} - rerr = xa.callDtm(xa, "prepare") - if rerr != nil { - return - } - // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 - defer func() { - x := recover() - operation := If(x != nil || rerr != nil, "abort", "submit").(string) - err := xa.callDtm(xa, operation) - if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 - rerr = err - } - if x != nil { - panic(x) - } - }() - resp, rerr := xaFunc(&xa) - rerr = CheckResponse(resp, rerr) - return + xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.XaClientBase.Server, "")} + return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error { + return xa.callDtm(xa, action) + }, func() error { + resp, rerr := xaFunc(&xa) + return CheckResponse(resp, rerr) + }) } // CallBranch call a xa branch diff --git a/dtmcli/xa_base.go b/dtmcli/xa_base.go new file mode 100644 index 0000000..1c33c7b --- /dev/null +++ b/dtmcli/xa_base.go @@ -0,0 +1,81 @@ +package dtmcli + +import ( + "database/sql" + "fmt" + "strings" +) + +// XaClientBase XaClient/XaGrpcClient base +type XaClientBase struct { + Server string + Conf map[string]string + NotifyURL string +} + +// HandleCallback 处理commit/rollback的回调 +func (xc *XaClientBase) HandleCallback(gid string, branchID string, action string) error { + db, err := SdbAlone(xc.Conf) + if err != nil { + return err + } + defer db.Close() + xaID := gid + "-" + branchID + _, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) + if err != nil && strings.Contains(err.Error(), "Error 1397: XAER_NOTA") { // 重复commit/rollback同一个id,报这个错误,忽略 + err = nil + } + return err +} + +// HandleLocalTrans http/grpc 处理LocalTransaction的公共方法 +func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) (interface{}, error)) (ret interface{}, rerr error) { + branchID := xa.NewBranchID() + xaBranch := xa.Gid + "-" + branchID + db, rerr := SdbAlone(xc.Conf) + if rerr != nil { + return + } + defer func() { db.Close() }() + defer func() { + x := recover() + _, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch)) + if x == nil && rerr == nil && err == nil { + _, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch)) + } + if rerr == nil { + rerr = err + } + if x != nil { + panic(x) + } + }() + _, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch)) + if rerr != nil { + return + } + ret, rerr = cb(db) + return +} + +// HandleGlobalTrans http/grpc GlobalTransaction的公共方法 +func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) { + rerr = callDtm("prepare") + if rerr != nil { + return + } + // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 + defer func() { + x := recover() + operation := If(x != nil || rerr != nil, "abort", "submit").(string) + err := callDtm(operation) + if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 + rerr = err + } + if x != nil { + panic(x) + } + }() + rerr = callBusi() + return +} diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index 0a79697..7417628 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/yedf/dtm/dtmcli" + grpc "google.golang.org/grpc" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) // XaGrpcGlobalFunc type of xa global function @@ -16,9 +18,7 @@ type XaGrpcLocalFunc func(db *sql.DB, xa *XaGrpc) error // XaGrpcClient xa client type XaGrpcClient struct { - Server string - Conf map[string]string - NotifyURL string + dtmcli.XaClientBase } // XaGrpc xa transaction @@ -39,25 +39,17 @@ func XaGrpcFromRequest(br *BusiRequest) (*XaGrpc, error) { // NewXaGrpcClient construct a xa client func NewXaGrpcClient(server string, mysqlConf map[string]string, notifyURL string) *XaGrpcClient { - xa := &XaGrpcClient{ + xa := &XaGrpcClient{XaClientBase: dtmcli.XaClientBase{ Server: server, Conf: mysqlConf, NotifyURL: notifyURL, - } + }} return xa } // HandleCallback 处理commit/rollback的回调 func (xc *XaGrpcClient) HandleCallback(gid string, branchID string, action string) error { - db, err := dtmcli.SdbAlone(xc.Conf) - if err != nil { - return err - } - defer db.Close() - xaID := gid + "-" + branchID - _, err = dtmcli.DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) - return err - + return xc.XaClientBase.HandleCallback(gid, branchID, action) } // XaLocalTransaction start a xa local transaction @@ -66,43 +58,21 @@ func (xc *XaGrpcClient) XaLocalTransaction(br *BusiRequest, xaFunc XaGrpcLocalFu if rerr != nil { return } - xa.Dtm = xc.Server - branchID := xa.NewBranchID() - xaBranch := xa.Gid + "-" + branchID - db, rerr := dtmcli.SdbAlone(xc.Conf) - if rerr != nil { - return - } - defer func() { db.Close() }() - defer func() { - x := recover() - _, err := dtmcli.DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch)) - if x == nil && rerr == nil && err == nil { - _, err = dtmcli.DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch)) + _, rerr = xc.HandleLocalTrans(&xa.TransBase, func(db *sql.DB) (interface{}, error) { + rerr := xaFunc(db, xa) + if rerr != nil { + return nil, rerr } - if rerr == nil { - rerr = err - } - if x != nil { - panic(x) - } - }() - _, rerr = dtmcli.DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch)) - if rerr != nil { - return - } - rerr = xaFunc(db, xa) - if rerr != nil { - return - } - _, rerr = MustGetDtmClient(xa.Dtm).RegisterXaBranch(context.Background(), &DtmXaBranchRequest{ - Info: &BranchInfo{ - Gid: xa.Gid, - BranchID: branchID, - TransType: xa.TransType, - }, - BusiData: "", - Notify: xc.NotifyURL, + _, rerr = MustGetDtmClient(xa.Dtm).RegisterXaBranch(context.Background(), &DtmXaBranchRequest{ + Info: &BranchInfo{ + Gid: xa.Gid, + BranchID: xa.CurrentBranchID(), + TransType: xa.TransType, + }, + BusiData: "", + Notify: xc.NotifyURL, + }) + return nil, rerr }) return } @@ -115,27 +85,17 @@ func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) Gid: gid, TransType: xa.TransType, } - _, rerr = dc.Prepare(context.Background(), req) - if rerr != nil { - return - } - // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 - defer func() { - x := recover() - if x == nil && rerr == nil { - _, rerr = dc.Submit(context.Background(), req) - return - } - _, err := dc.Abort(context.Background(), req) - if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 - rerr = err - } - if x != nil { - panic(x) - } - }() - rerr = xaFunc(&xa) - return + return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error { + f := map[string]func(context.Context, *DtmRequest, ...grpc.CallOption) (*emptypb.Empty, error){ + "prepare": dc.Prepare, + "submit": dc.Submit, + "abort": dc.Abort, + }[action] + _, err := f(context.Background(), req) + return err + }, func() error { + return xaFunc(&xa) + }) } // CallBranch call a xa branch diff --git a/test/xa_test.go b/test/xa_test.go index 35ac8ee..09f2c47 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -6,6 +6,7 @@ import ( "github.com/go-resty/resty/v2" "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -16,6 +17,7 @@ func TestXa(t *testing.T) { } xaLocalError(t) xaNormal(t) + xaDuplicate(t) xaRollback(t) } @@ -43,6 +45,23 @@ func xaNormal(t *testing.T) { assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) } +func xaDuplicate(t *testing.T) { + xc := examples.XaClient + gid := "xaDuplicate" + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + req := examples.GenTransReq(30, false, false) + _, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") + assert.Nil(t, err) + sdb, err := dtmcli.SdbAlone(common.DtmConfig.DB) + assert.Nil(t, err) + dtmcli.DBExec(sdb, "xa recover") + dtmcli.DBExec(sdb, "xa commit 'xaDuplicate-0101'") // 先把某一个事务提交,模拟重复请求 + return xa.CallBranch(req, examples.Busi+"/TransInXa") + }) + assert.Equal(t, nil, err) + WaitTransProcessed(gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) +} func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback"