From 31a69e3c84a299d0f42ff035452b615cb8b7d8af Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 12:00:28 +0800 Subject: [PATCH] xa new interface seems ok --- dtmcli/message.go | 23 ++++++++-------- dtmcli/saga.go | 12 +++++---- dtmcli/types.go | 59 +++++++++++++++++++++++++++++++++++++++++ dtmcli/xa.go | 31 ++++++++++++---------- dtmsvr/trans_xa.go | 2 +- dtmsvr/trans_xa_test.go | 11 ++++---- examples/main_xa.go | 4 +-- 7 files changed, 103 insertions(+), 39 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 9cd5b64..9fa680a 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -1,8 +1,6 @@ package dtmcli import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -49,17 +47,18 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { return s } -// Submit submit the msg -func (s *Msg) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) - return CheckDtmResponse(resp, err) -} - // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) - return CheckDtmResponse(resp, err) + return callDtmSimple(s.Server, &s.MsgData, "prepare") +} + +// Submit submit the msg +func (s *Msg) Submit() error { + return callDtmSimple(s.Server, &s.MsgData, "submit") +} + +// SubmitExt 高级submit,更多的选项和更详细的返回值 +func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { + return callDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index c92d13d..be2874f 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -1,8 +1,6 @@ package dtmcli import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -52,7 +50,11 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) - return CheckDtmResponse(resp, err) + _, err := s.SubmitExt(&TransOptions{}) + return err +} + +// SubmitExt 高级submit,更多的选项和更详细的返回值 +func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { + return callDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/types.go b/dtmcli/types.go index 29b8806..8ff0d4f 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -1,6 +1,7 @@ package dtmcli import ( + "errors" "fmt" "strings" @@ -61,3 +62,61 @@ func (g *IDGenerator) NewBranchID() string { g.branchID = g.branchID + 1 return g.parentID + fmt.Sprintf("%02d", g.branchID) } + +// TransStatus 全局事务状态,采用string +type TransStatus string + +const ( + // TransEmpty 空值 + TransEmpty TransStatus = "" + // TransSubmitted 已提交给DTM + TransSubmitted TransStatus = "submitted" + // TransAborting 正在回滚中,有两种情况会出现,一是用户侧发起abort请求,而是发起submit同步请求,但是dtm进行回滚中出现错误 + TransAborting TransStatus = "aborting" + // TransSucceed 事务已完成 + TransSucceed TransStatus = "succeed" + // TransFailed 事务已回滚 + TransFailed TransStatus = "failed" + // TransErrorPrepare prepare调用报错 + TransErrorPrepare TransStatus = "error_parepare" + // TransErrorSubmit submit调用报错 + TransErrorSubmit TransStatus = "error_submit" + // TransErrorAbort abort调用报错 + TransErrorAbort TransStatus = "error_abort" +) + +// TransOptions 提交/终止事务的选项 +type TransOptions struct { + // WaitResult 是否等待全局事务的最终结果 + WaitResult bool +} + +// TransResult dtm 返回的结果 +type TransResult struct { + DtmResult string `json:"dtm_result"` + Status TransStatus + Message string +} + +func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { + resp, err := common.RestyClient.R().SetQueryParams(common.MS{ + "wait_result": common.If(opt.WaitResult, "1", "").(string), + }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) + errStatus := TransStatus("error_" + operation) + if err != nil { + return errStatus, err + } + tr := resp.Result().(*TransResult) + if tr.DtmResult == "FAILURE" { + return errStatus, errors.New(tr.Message) + } + return tr.Status, nil +} + +func callDtmSimple(dtm string, body interface{}, operation string) error { + _, err := callDtm(dtm, body, operation, &TransOptions{}) + return err +} + +// ErrUserFailure 表示用户返回失败,要求回滚 +var ErrUserFailure = errors.New("user return FAILURE") diff --git a/dtmcli/xa.go b/dtmcli/xa.go index db998d3..1eac028 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -109,39 +109,38 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (ret interface{}, rerr error) { +func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status TransStatus, rerr error) { xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") - if IsFailure(resp, err) { - return resp, err + status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{}) + if rerr != nil { + return } + var resp *resty.Response // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { - var x interface{} - if x = recover(); x != nil || IsFailure(ret, rerr) { - resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") - } else { - resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") - } - if IsFailure(resp, err) { - common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String()) + x := recover() + operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) + var err error + status, err = callDtm(xc.Server, data, operation, &TransOptions{}) + if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 + rerr = err } if x != nil { panic(x) } }() - ret, rerr = xaFunc(&xa) + resp, rerr = xaFunc(&xa) return } // CallBranch call a xa branch func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { branchID := x.NewBranchID() - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "gid": x.Gid, @@ -150,4 +149,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) + if IsFailure(resp, nil) { + err = ErrUserFailure + } + return resp, err } diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 1e33353..819418c 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -21,7 +21,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch { func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "branch_id": branch.BranchID, - "action": common.If(t.Status == "prepared", "rollback", "commit").(string), + "action": common.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string), "gid": branch.Gid, }).Post(branch.URL) e2p(err) diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index 167603a..e0acc84 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -20,7 +20,8 @@ func TestXa(t *testing.T) { } func xaLocalError(t *testing.T) { - _, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { + xc := examples.XaClient + _, err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { return nil, fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) @@ -29,7 +30,7 @@ func xaLocalError(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient gid := "xaNormal" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { @@ -37,7 +38,7 @@ func xaNormal(t *testing.T) { } return xa.CallBranch(req, examples.Busi+"/TransInXa") }) - dtmcli.PanicIfFailure(res, err) + assert.Equal(t, nil, err) WaitTransProcessed(gid) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) } @@ -45,7 +46,7 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { @@ -53,7 +54,7 @@ func xaRollback(t *testing.T) { } return xa.CallBranch(req, examples.Busi+"/TransInXa") }) - assert.True(t, dtmcli.IsFailure(res, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) assert.Equal(t, "failed", getTransStatus(gid)) diff --git a/examples/main_xa.go b/examples/main_xa.go index 36c4c75..7af6752 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -28,14 +28,14 @@ func XaSetup(app *gin.Engine) { // XaFireRequest 注册全局XA事务,调用XA的分支 func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) - res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { return resp, err } return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") }) - dtmcli.PanicIfFailure(res, err) + e2p(err) return gid }