From eb4a4cde60e9e55b58b543ca18cba98cf44c1e73 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 09:48:24 +0800 Subject: [PATCH 01/13] change tcc and xa func to return resty.Response --- dtmcli/tcc.go | 2 +- dtmcli/xa.go | 2 +- dtmsvr/trans_tcc_barrier_test.go | 7 ++++--- dtmsvr/trans_tcc_test.go | 5 +++-- dtmsvr/trans_xa_test.go | 7 ++++--- examples/main_tcc.go | 5 +++-- examples/main_tcc_barrier.go | 3 ++- examples/main_xa.go | 3 ++- 8 files changed, 20 insertions(+), 14 deletions(-) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 08a9037..6e93b6f 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -16,7 +16,7 @@ type Tcc struct { } // TccGlobalFunc type of global tcc call -type TccGlobalFunc func(tcc *Tcc) (interface{}, error) +type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // TccGlobalTransaction begin a tcc global transaction // dtm dtm服务器地址 diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 39f895d..db998d3 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -16,7 +16,7 @@ type M = map[string]interface{} var e2p = common.E2P // XaGlobalFunc type of xa global function -type XaGlobalFunc func(xa *Xa) (interface{}, error) +type XaGlobalFunc func(xa *Xa) (*resty.Response, error) // XaLocalFunc type of xa local function type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error) diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index 5375c86..4b46d06 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/yedf/dtm/common" @@ -23,7 +24,7 @@ func TestTccBarrier(t *testing.T) { func tccBarrierRollback(t *testing.T) { gid := "tccBarrierRollback" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") assert.True(t, !dtmcli.IsFailure(resp, err)) return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") @@ -35,7 +36,7 @@ func tccBarrierRollback(t *testing.T) { func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") assert.True(t, !dtmcli.IsFailure(resp, err)) return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") @@ -49,7 +50,7 @@ func tccBarrierDisorder(t *testing.T) { timeoutChan := make(chan string, 2) finishedChan := make(chan string, 2) gid := "tccBarrierDisorder" - _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { body := &examples.TransReq{Amount: 30} tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index 1a6666f..dc1833e 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-resty/resty/v2" "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" @@ -18,7 +19,7 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") if dtmcli.IsFailure(resp, err) { return resp, err @@ -31,7 +32,7 @@ func tccNormal(t *testing.T) { func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.True(t, !dtmcli.IsFailure(resp, rerr)) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index 846ca39..167603a 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/go-resty/resty/v2" "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" @@ -19,7 +20,7 @@ func TestXa(t *testing.T) { } func xaLocalError(t *testing.T) { - _, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (interface{}, error) { + _, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { return nil, fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) @@ -28,7 +29,7 @@ func xaLocalError(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient gid := "xaNormal" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) { + res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { @@ -44,7 +45,7 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) { + res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { diff --git a/examples/main_tcc.go b/examples/main_tcc.go index fd7f182..97c49dc 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -2,6 +2,7 @@ package examples import ( "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" @@ -20,7 +21,7 @@ func TccSetup(app *gin.Engine) { // TccFireRequestNested 1 func TccFireRequestNested() string { gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") if dtmcli.IsFailure(resp, err) { return resp, err @@ -35,7 +36,7 @@ func TccFireRequestNested() string { func TccFireRequest() string { logrus.Printf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") if dtmcli.IsFailure(resp, err) { return resp, err diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index c7d973e..4831b1d 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" @@ -14,7 +15,7 @@ import ( func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") if dtmcli.IsFailure(resp, err) { return resp, err diff --git a/examples/main_xa.go b/examples/main_xa.go index aa0a859..36c4c75 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -4,6 +4,7 @@ import ( "database/sql" "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" ) @@ -27,7 +28,7 @@ func XaSetup(app *gin.Engine) { // XaFireRequest 注册全局XA事务,调用XA的分支 func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) - res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) { + res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { return resp, err From 31a69e3c84a299d0f42ff035452b615cb8b7d8af Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 12:00:28 +0800 Subject: [PATCH 02/13] xa new interface seems ok --- dtmcli/message.go | 23 ++++++++-------- dtmcli/saga.go | 12 +++++---- dtmcli/types.go | 59 +++++++++++++++++++++++++++++++++++++++++ dtmcli/xa.go | 31 ++++++++++++---------- dtmsvr/trans_xa.go | 2 +- dtmsvr/trans_xa_test.go | 11 ++++---- examples/main_xa.go | 4 +-- 7 files changed, 103 insertions(+), 39 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 9cd5b64..9fa680a 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -1,8 +1,6 @@ package dtmcli import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -49,17 +47,18 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { return s } -// Submit submit the msg -func (s *Msg) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) - return CheckDtmResponse(resp, err) -} - // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) - return CheckDtmResponse(resp, err) + return callDtmSimple(s.Server, &s.MsgData, "prepare") +} + +// Submit submit the msg +func (s *Msg) Submit() error { + return callDtmSimple(s.Server, &s.MsgData, "submit") +} + +// SubmitExt 高级submit,更多的选项和更详细的返回值 +func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { + return callDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index c92d13d..be2874f 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -1,8 +1,6 @@ package dtmcli import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -52,7 +50,11 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) - return CheckDtmResponse(resp, err) + _, err := s.SubmitExt(&TransOptions{}) + return err +} + +// SubmitExt 高级submit,更多的选项和更详细的返回值 +func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { + return callDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/types.go b/dtmcli/types.go index 29b8806..8ff0d4f 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -1,6 +1,7 @@ package dtmcli import ( + "errors" "fmt" "strings" @@ -61,3 +62,61 @@ func (g *IDGenerator) NewBranchID() string { g.branchID = g.branchID + 1 return g.parentID + fmt.Sprintf("%02d", g.branchID) } + +// TransStatus 全局事务状态,采用string +type TransStatus string + +const ( + // TransEmpty 空值 + TransEmpty TransStatus = "" + // TransSubmitted 已提交给DTM + TransSubmitted TransStatus = "submitted" + // TransAborting 正在回滚中,有两种情况会出现,一是用户侧发起abort请求,而是发起submit同步请求,但是dtm进行回滚中出现错误 + TransAborting TransStatus = "aborting" + // TransSucceed 事务已完成 + TransSucceed TransStatus = "succeed" + // TransFailed 事务已回滚 + TransFailed TransStatus = "failed" + // TransErrorPrepare prepare调用报错 + TransErrorPrepare TransStatus = "error_parepare" + // TransErrorSubmit submit调用报错 + TransErrorSubmit TransStatus = "error_submit" + // TransErrorAbort abort调用报错 + TransErrorAbort TransStatus = "error_abort" +) + +// TransOptions 提交/终止事务的选项 +type TransOptions struct { + // WaitResult 是否等待全局事务的最终结果 + WaitResult bool +} + +// TransResult dtm 返回的结果 +type TransResult struct { + DtmResult string `json:"dtm_result"` + Status TransStatus + Message string +} + +func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { + resp, err := common.RestyClient.R().SetQueryParams(common.MS{ + "wait_result": common.If(opt.WaitResult, "1", "").(string), + }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) + errStatus := TransStatus("error_" + operation) + if err != nil { + return errStatus, err + } + tr := resp.Result().(*TransResult) + if tr.DtmResult == "FAILURE" { + return errStatus, errors.New(tr.Message) + } + return tr.Status, nil +} + +func callDtmSimple(dtm string, body interface{}, operation string) error { + _, err := callDtm(dtm, body, operation, &TransOptions{}) + return err +} + +// ErrUserFailure 表示用户返回失败,要求回滚 +var ErrUserFailure = errors.New("user return FAILURE") diff --git a/dtmcli/xa.go b/dtmcli/xa.go index db998d3..1eac028 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -109,39 +109,38 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (ret interface{}, rerr error) { +func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status TransStatus, rerr error) { xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") - if IsFailure(resp, err) { - return resp, err + status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{}) + if rerr != nil { + return } + var resp *resty.Response // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { - var x interface{} - if x = recover(); x != nil || IsFailure(ret, rerr) { - resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") - } else { - resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") - } - if IsFailure(resp, err) { - common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String()) + x := recover() + operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) + var err error + status, err = callDtm(xc.Server, data, operation, &TransOptions{}) + if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 + rerr = err } if x != nil { panic(x) } }() - ret, rerr = xaFunc(&xa) + resp, rerr = xaFunc(&xa) return } // CallBranch call a xa branch func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { branchID := x.NewBranchID() - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "gid": x.Gid, @@ -150,4 +149,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) + if IsFailure(resp, nil) { + err = ErrUserFailure + } + return resp, err } diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 1e33353..819418c 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -21,7 +21,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch { func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "branch_id": branch.BranchID, - "action": common.If(t.Status == "prepared", "rollback", "commit").(string), + "action": common.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string), "gid": branch.Gid, }).Post(branch.URL) e2p(err) diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index 167603a..e0acc84 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -20,7 +20,8 @@ func TestXa(t *testing.T) { } func xaLocalError(t *testing.T) { - _, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { + xc := examples.XaClient + _, err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { return nil, fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) @@ -29,7 +30,7 @@ func xaLocalError(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient gid := "xaNormal" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { @@ -37,7 +38,7 @@ func xaNormal(t *testing.T) { } return xa.CallBranch(req, examples.Busi+"/TransInXa") }) - dtmcli.PanicIfFailure(res, err) + assert.Equal(t, nil, err) WaitTransProcessed(gid) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) } @@ -45,7 +46,7 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback" - res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { @@ -53,7 +54,7 @@ func xaRollback(t *testing.T) { } return xa.CallBranch(req, examples.Busi+"/TransInXa") }) - assert.True(t, dtmcli.IsFailure(res, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) assert.Equal(t, "failed", getTransStatus(gid)) diff --git a/examples/main_xa.go b/examples/main_xa.go index 36c4c75..7af6752 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -28,14 +28,14 @@ func XaSetup(app *gin.Engine) { // XaFireRequest 注册全局XA事务,调用XA的分支 func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) - res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { return resp, err } return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") }) - dtmcli.PanicIfFailure(res, err) + e2p(err) return gid } From cca406dca80b74c364f9e47a85b2663dc9a50e66 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 14:15:32 +0800 Subject: [PATCH 03/13] add checkuserresponse --- dtmcli/types.go | 15 +++++++++++++++ dtmcli/xa.go | 5 +---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/dtmcli/types.go b/dtmcli/types.go index 8ff0d4f..d716430 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -34,6 +34,18 @@ func PanicIfFailure(res interface{}, err error) { } } +// CheckUserResponse 检查Response,返回错误 +func CheckUserResponse(resp *resty.Response, err error) error { + if err == nil && resp != nil { + if resp.IsError() { + return errors.New(resp.String()) + } else if strings.Contains(resp.String(), "FAILURE") { + return ErrUserFailure + } + } + return err +} + // CheckDtmResponse check the response of dtm, if not ok ,generate error func CheckDtmResponse(resp *resty.Response, err error) error { if err != nil { @@ -120,3 +132,6 @@ func callDtmSimple(dtm string, body interface{}, operation string) error { // ErrUserFailure 表示用户返回失败,要求回滚 var ErrUserFailure = errors.New("user return FAILURE") + +// ErrDtmFailure 表示用户返回失败,要求回滚 +var ErrDtmFailure = errors.New("dtm return FAILURE") diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 1eac028..75625e4 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -149,8 +149,5 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) - if IsFailure(resp, nil) { - err = ErrUserFailure - } - return resp, err + return resp, CheckUserResponse(resp, err) } From 2a36d597d77f4353135144dd99c6732a05db6fda Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 14:40:33 +0800 Subject: [PATCH 04/13] xa changed --- dtmcli/types.go | 10 +++++----- dtmcli/xa.go | 20 +++++++------------- dtmsvr/trans_xa_test.go | 6 +++--- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/dtmcli/types.go b/dtmcli/types.go index d716430..d7a9044 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -40,7 +40,7 @@ func CheckUserResponse(resp *resty.Response, err error) error { if resp.IsError() { return errors.New(resp.String()) } else if strings.Contains(resp.String(), "FAILURE") { - return ErrUserFailure + return ErrFailure } } return err @@ -130,8 +130,8 @@ func callDtmSimple(dtm string, body interface{}, operation string) error { return err } -// ErrUserFailure 表示用户返回失败,要求回滚 -var ErrUserFailure = errors.New("user return FAILURE") +// ErrFailure 表示返回失败,要求回滚 +var ErrFailure = errors.New("transaction FAILURE") -// ErrDtmFailure 表示用户返回失败,要求回滚 -var ErrDtmFailure = errors.New("dtm return FAILURE") +// ResultSuccess 表示返回成功,可以进行下一步 +var ResultSuccess = common.M{"dtm_result": "SUCCESS"} diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 75625e4..d751799 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -66,7 +66,7 @@ func (xc *XaClient) HandleCallback(gid string, branchID string, action string) ( defer db.Close() xaID := gid + "-" + branchID _, err := common.SdbExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) - return M{"dtm_result": "SUCCESS"}, err + return ResultSuccess, err } @@ -78,17 +78,13 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret db := common.SdbAlone(xc.Conf) defer func() { db.Close() }() defer func() { - var x interface{} + x := recover() _, err := common.SdbExec(db, fmt.Sprintf("XA end '%s'", xaBranch)) - if err != nil { - common.RedLogf("sql db exec error: %v", err) - } - if x = recover(); x != nil || IsFailure(ret, rerr) { - } else { + if x == nil && rerr == nil && err == nil { _, err = common.SdbExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch)) } - if err != nil { - common.RedLogf("sql db exec error: %v", err) + if rerr == nil { + rerr = err } if x != nil { panic(x) @@ -99,12 +95,10 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret return } ret, rerr = xaFunc(db, xa) - if IsFailure(ret, rerr) { + if rerr != nil { return } - ret, rerr = common.RestyClient.R(). - SetBody(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}). - Post(xc.Server + "/registerXaBranch") + _, rerr = callDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) return } diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index e0acc84..3eb7ba4 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -33,7 +33,7 @@ func xaNormal(t *testing.T) { _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return xa.CallBranch(req, examples.Busi+"/TransInXa") @@ -49,13 +49,13 @@ func xaRollback(t *testing.T) { _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return xa.CallBranch(req, examples.Busi+"/TransInXa") }) assert.Error(t, err) WaitTransProcessed(gid) - assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, []string{"succeed", "prepared", "succeed", "prepared"}, getBranchesStatus(gid)) assert.Equal(t, "failed", getTransStatus(gid)) } From a35d48cf0f45eb1586e099bce0031479ee847298 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 14:57:13 +0800 Subject: [PATCH 05/13] xa changed --- dtmcli/types.go | 19 +++++++++++++++++-- dtmcli/xa.go | 3 ++- dtmsvr/trans_xa_test.go | 2 +- examples/quick_start.go | 2 +- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/dtmcli/types.go b/dtmcli/types.go index d7a9044..b29acd2 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -34,8 +34,8 @@ func PanicIfFailure(res interface{}, err error) { } } -// CheckUserResponse 检查Response,返回错误 -func CheckUserResponse(resp *resty.Response, err error) error { +// CheckResponse 检查Response,返回错误 +func CheckResponse(resp *resty.Response, err error) error { if err == nil && resp != nil { if resp.IsError() { return errors.New(resp.String()) @@ -46,6 +46,18 @@ func CheckUserResponse(resp *resty.Response, err error) error { return err } +// CheckResult 检查Result,返回错误 +func CheckResult(res interface{}, err error) error { + resp, ok := res.(*resty.Response) + if ok { + return CheckResponse(resp, err) + } + if res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") { + return ErrFailure + } + return err +} + // CheckDtmResponse check the response of dtm, if not ok ,generate error func CheckDtmResponse(resp *resty.Response, err error) error { if err != nil { @@ -135,3 +147,6 @@ var ErrFailure = errors.New("transaction FAILURE") // ResultSuccess 表示返回成功,可以进行下一步 var ResultSuccess = common.M{"dtm_result": "SUCCESS"} + +// ResultFailure 表示返回失败,要求回滚 +var ResultFailure = common.M{"dtm_result": "FAILURE"} diff --git a/dtmcli/xa.go b/dtmcli/xa.go index d751799..f8520b2 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -95,6 +95,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret return } ret, rerr = xaFunc(db, xa) + rerr = CheckResult(ret, rerr) if rerr != nil { return } @@ -143,5 +144,5 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) - return resp, CheckUserResponse(resp, err) + return resp, CheckResponse(resp, err) } diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index 3eb7ba4..c848d1c 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -56,6 +56,6 @@ func xaRollback(t *testing.T) { }) assert.Error(t, err) WaitTransProcessed(gid) - assert.Equal(t, []string{"succeed", "prepared", "succeed", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) assert.Equal(t, "failed", getTransStatus(gid)) } diff --git a/examples/quick_start.go b/examples/quick_start.go index 3b3810d..409e0b3 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -44,7 +44,7 @@ func QsFireRequest() string { func qsAdjustBalance(uid int, amount int) (interface{}, error) { _, err := common.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) - return M{"dtm_result": "SUCCESS"}, err + return dtmcli.ResultSuccess, err } func qsAddRoute(app *gin.Engine) { From a28f0f34302dd82055210e8acaf3b6b8bf7ee0ba Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 15:06:12 +0800 Subject: [PATCH 06/13] use predefined Succcess/Failure --- dtmcli/barrier.go | 6 +++--- dtmsvr/api.go | 11 ++++++----- examples/main_saga_barrier.go | 2 +- examples/main_tcc_barrier.go | 2 +- examples/main_xa.go | 10 +++++----- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 31ee7ac..4a97a55 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -98,14 +98,14 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType) logrus.Printf("originAffected: %d currentAffected: %d", originAffected, currentAffected) if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功 - res = common.MS{"dtm_result": "SUCCESS"} + res = ResultSuccess return } else if currentAffected == 0 { // 插入不成功 var result sql.NullString err := common.StxQueryRow(tx, "select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?", ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType).Scan(&result) if err == sql.ErrNoRows { // 这个是悬挂操作,返回失败,AP收到这个返回,会尽快回滚 - res = common.MS{"dtm_result": "FAILURE"} + res = ResultFailure return } if err != nil { @@ -117,7 +117,7 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re return } // 数据库里没有上次的结果,属于重复空补偿,直接返回成功 - res = common.MS{"dtm_result": "SUCCESS"} + res = ResultSuccess return } res, rerr = busiCall(tx) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 0ca9511..e614354 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -28,7 +29,7 @@ func prepare(c *gin.Context) (interface{}, error) { t := TransFromContext(c) t.Status = "prepared" t.saveNew(dbGet()) - return M{"dtm_result": "SUCCESS"}, nil + return dtmcli.ResultSuccess, nil } func submit(c *gin.Context) (interface{}, error) { @@ -41,7 +42,7 @@ func submit(c *gin.Context) (interface{}, error) { t.Status = "submitted" t.saveNew(db) go t.Process(db) - return M{"dtm_result": "SUCCESS"}, nil + return dtmcli.ResultSuccess, nil } func abort(c *gin.Context) (interface{}, error) { @@ -52,7 +53,7 @@ func abort(c *gin.Context) (interface{}, error) { return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil } go dbt.Process(db) - return M{"dtm_result": "SUCCESS"}, nil + return dtmcli.ResultSuccess, nil } func registerXaBranch(c *gin.Context) (interface{}, error) { @@ -73,7 +74,7 @@ func registerXaBranch(c *gin.Context) (interface{}, error) { e2p(err) global := TransGlobal{Gid: branch.Gid} global.touch(db, config.TransCronInterval) - return M{"dtm_result": "SUCCESS"}, nil + return dtmcli.ResultSuccess, nil } func registerTccBranch(c *gin.Context) (interface{}, error) { @@ -104,7 +105,7 @@ func registerTccBranch(c *gin.Context) (interface{}, error) { e2p(err) global := TransGlobal{Gid: branch.Gid} global.touch(dbGet(), config.TransCronInterval) - return M{"dtm_result": "SUCCESS"}, nil + return dtmcli.ResultSuccess, nil } func query(c *gin.Context) (interface{}, error) { diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index da2b61b..9c50fa8 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -33,7 +33,7 @@ func SagaBarrierAddRoute(app *gin.Engine) { func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { _, err := common.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) - return common.MS{"dtm_result": "SUCCESS"}, err + return dtmcli.ResultSuccess, err } diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index 4831b1d..e494f28 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -56,7 +56,7 @@ func adjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { if err == nil && affected == 0 { return nil, fmt.Errorf("update 0 rows") } - return common.MS{"dtm_result": "SUCCESS"}, err + return dtmcli.ResultSuccess, err } // TCC下,转入 diff --git a/examples/main_xa.go b/examples/main_xa.go index 7af6752..b07ae83 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -30,7 +30,7 @@ func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") @@ -42,19 +42,19 @@ func XaFireRequest() string { func xaTransIn(c *gin.Context) (interface{}, error) { return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) { if reqFrom(c).TransInResult == "FAILURE" { - return M{"dtm_result": "FAILURE"}, nil + return dtmcli.ResultFailure, nil } _, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2) - return M{"dtm_result": "SUCCESS"}, err + return dtmcli.ResultSuccess, err }) } func xaTransOut(c *gin.Context) (interface{}, error) { return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) { if reqFrom(c).TransOutResult == "FAILURE" { - return M{"dtm_result": "FAILURE"}, nil + return dtmcli.ResultFailure, nil } _, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1) - return M{"dtm_result": "SUCCESS"}, err + return dtmcli.ResultSuccess, err }) } From 483c76d0155b762ce08905b12eba9f2ca7e033f7 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 15:23:02 +0800 Subject: [PATCH 07/13] tcc changed --- dtmcli/message.go | 2 +- dtmcli/saga.go | 2 +- dtmcli/tcc.go | 47 +++++++++++++++----------------- dtmcli/types.go | 4 +-- dtmcli/xa.go | 6 ++-- dtmsvr/trans_tcc_barrier_test.go | 34 +++++++++++------------ dtmsvr/trans_tcc_test.go | 18 ++++++------ 7 files changed, 53 insertions(+), 60 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 9fa680a..9104862 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -60,5 +60,5 @@ func (s *Msg) Submit() error { // SubmitExt 高级submit,更多的选项和更详细的返回值 func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { - return callDtm(s.Server, &s.MsgData, "submit", opt) + return CallDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index be2874f..855b5ef 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -56,5 +56,5 @@ func (s *Saga) Submit() error { // SubmitExt 高级submit,更多的选项和更详细的返回值 func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { - return callDtm(s.Server, &s.SagaData, "submit", opt) + return CallDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 6e93b6f..8c8293e 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -28,26 +28,24 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret in "trans_type": "tcc", } tcc := &Tcc{Dtm: dtm, Gid: gid} - resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") - if IsFailure(resp, err) { + resp, err := CallDtm(dtm, data, "prepare", &TransOptions{}) + if err != nil { return resp, err } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { - var x interface{} - if x = recover(); x != nil || IsFailure(ret, rerr) { - resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") - } else { - resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") - } - if IsFailure(resp, err) { - common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String()) + x := recover() + operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) + resp, err = CallDtm(dtm, data, operation, &TransOptions{}) + if rerr == nil { + rerr = err } if x != nil { panic(x) } }() ret, rerr = tccFunc(tcc) + rerr = CheckResult(ret, rerr) return } @@ -68,22 +66,20 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - resp, err := common.RestyClient.R(). - SetBody(&M{ - "gid": t.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(t.Dtm + "/registerTccBranch") - if IsFailure(resp, err) { - return resp, err + _, err := CallDtm(t.Dtm, &M{ + "gid": t.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }, "registerTccBranch", &TransOptions{}) + if err != nil { + return nil, err } - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": t.Dtm, @@ -93,4 +89,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "branch_type": "try", }). Post(tryURL) + return resp, CheckResponse(resp, err) } diff --git a/dtmcli/types.go b/dtmcli/types.go index b29acd2..d10e112 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -122,7 +122,7 @@ type TransResult struct { Message string } -func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { +func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "wait_result": common.If(opt.WaitResult, "1", "").(string), }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) @@ -138,7 +138,7 @@ func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) } func callDtmSimple(dtm string, body interface{}, operation string) error { - _, err := callDtm(dtm, body, operation, &TransOptions{}) + _, err := CallDtm(dtm, body, operation, &TransOptions{}) return err } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index f8520b2..328188f 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -99,7 +99,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - _, rerr = callDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + _, rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) return } @@ -110,7 +110,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status "gid": gid, "trans_type": "xa", } - status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{}) + status, rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) if rerr != nil { return } @@ -120,7 +120,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status x := recover() operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) var err error - status, err = callDtm(xc.Server, data, operation, &TransOptions{}) + status, err = CallDtm(xc.Server, data, operation, &TransOptions{}) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index 4b46d06..b29c089 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -36,12 +36,12 @@ func tccBarrierRollback(t *testing.T) { func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - assert.True(t, !dtmcli.IsFailure(resp, err)) + _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - assert.True(t, !dtmcli.IsFailure(resp, err)) + assert.Nil(t, err) WaitTransProcessed(gid) assert.Equal(t, "succeed", getTransStatus(gid)) } @@ -69,23 +69,21 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - r, err := common.RestyClient.R(). - SetBody(&M{ - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(tcc.Dtm + "/registerTccBranch") - assert.True(t, !dtmcli.IsFailure(r, err)) + _, err := dtmcli.CallDtm(tcc.Dtm, M{ + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }, "registerTccBranch", &dtmcli.TransOptions{}) + assert.Nil(t, err) go func() { logrus.Printf("sleeping to wait for tcc try timeout") <-timeoutChan - r, _ = common.RestyClient.R(). + r, _ := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": tcc.Dtm, diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index dc1833e..1a07979 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -19,26 +19,24 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { - return resp, err - } + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Nil(t, err) return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + assert.Nil(t, err) } func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - assert.True(t, !dtmcli.IsFailure(resp, rerr)) + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Nil(t, rerr) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - assert.True(t, dtmcli.IsFailure(resp, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, "aborting", getTransStatus(gid)) CronTransOnce(60 * time.Second) From a94761d7f6dead74e7e868995b4b3ff08d0d966d Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 15:26:05 +0800 Subject: [PATCH 08/13] tcc global change to TransStatus --- dtmcli/tcc.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 8c8293e..86c76b3 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -22,21 +22,22 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // dtm dtm服务器地址 // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 -func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret interface{}, rerr error) { +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (status TransStatus, rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } tcc := &Tcc{Dtm: dtm, Gid: gid} - resp, err := CallDtm(dtm, data, "prepare", &TransOptions{}) - if err != nil { - return resp, err + status, rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) + if rerr != nil { + return status, rerr } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() + var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - resp, err = CallDtm(dtm, data, operation, &TransOptions{}) + status, err = CallDtm(dtm, data, operation, &TransOptions{}) if rerr == nil { rerr = err } @@ -44,8 +45,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret in panic(x) } }() - ret, rerr = tccFunc(tcc) - rerr = CheckResult(ret, rerr) + resp, rerr := tccFunc(tcc) + rerr = CheckResponse(resp, rerr) return } From b20d18ecbbf446bf1f224fac891c70e5c3670fc8 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 16:19:31 +0800 Subject: [PATCH 09/13] clear failure --- dtmcli/message.go | 2 +- dtmcli/saga.go | 5 ++- dtmcli/tcc.go | 10 +++--- dtmcli/types.go | 62 ++++---------------------------- dtmcli/types_test.go | 5 --- dtmcli/xa.go | 12 +++---- dtmsvr/trans_tcc_barrier_test.go | 14 ++++---- dtmsvr/trans_tcc_test.go | 4 +-- dtmsvr/trans_xa_test.go | 6 ++-- examples/main_tcc.go | 12 +++---- examples/main_tcc_barrier.go | 6 ++-- examples/main_xa.go | 2 +- 12 files changed, 42 insertions(+), 98 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 9104862..80a044d 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -59,6 +59,6 @@ func (s *Msg) Submit() error { } // SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { +func (s *Msg) SubmitExt(opt *TransOptions) error { return CallDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index 855b5ef..a7f39ee 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -50,11 +50,10 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - _, err := s.SubmitExt(&TransOptions{}) - return err + return s.SubmitExt(&TransOptions{}) } // SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { +func (s *Saga) SubmitExt(opt *TransOptions) error { return CallDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 86c76b3..c2f019d 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -22,22 +22,22 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // dtm dtm服务器地址 // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 -func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (status TransStatus, rerr error) { +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } tcc := &Tcc{Dtm: dtm, Gid: gid} - status, rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) + rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) if rerr != nil { - return status, rerr + return rerr } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - status, err = CallDtm(dtm, data, operation, &TransOptions{}) + err = CallDtm(dtm, data, operation, &TransOptions{}) if rerr == nil { rerr = err } @@ -67,7 +67,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - _, err := CallDtm(t.Dtm, &M{ + err := CallDtm(t.Dtm, &M{ "gid": t.Gid, "branch_id": branchID, "trans_type": "tcc", diff --git a/dtmcli/types.go b/dtmcli/types.go index d10e112..92e97fc 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -19,21 +19,6 @@ func MustGenGid(server string) string { return res["gid"] } -// IsFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么返回true。此时认为业务调用失败 -func IsFailure(res interface{}, err error) bool { - resp, ok := res.(*resty.Response) - return err != nil || // 包含错误 - ok && (resp.IsError() || strings.Contains(resp.String(), "FAILURE")) || // resp包含failure - !ok && res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") // 结果中包含failure -} - -// PanicIfFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么Panic。此时认为业务调用失败 -func PanicIfFailure(res interface{}, err error) { - if IsFailure(res, err) { - panic(fmt.Errorf("dtm failure ret: %v err %v", res, err)) - } -} - // CheckResponse 检查Response,返回错误 func CheckResponse(resp *resty.Response, err error) error { if err == nil && resp != nil { @@ -58,17 +43,6 @@ func CheckResult(res interface{}, err error) error { return err } -// CheckDtmResponse check the response of dtm, if not ok ,generate error -func CheckDtmResponse(resp *resty.Response, err error) error { - if err != nil { - return err - } - if !strings.Contains(resp.String(), "SUCCESS") || resp.IsError() { - return fmt.Errorf("dtm response failed: %s", resp.String()) - } - return nil -} - // IDGenerator used to generate a branch id type IDGenerator struct { parentID string @@ -87,28 +61,6 @@ func (g *IDGenerator) NewBranchID() string { return g.parentID + fmt.Sprintf("%02d", g.branchID) } -// TransStatus 全局事务状态,采用string -type TransStatus string - -const ( - // TransEmpty 空值 - TransEmpty TransStatus = "" - // TransSubmitted 已提交给DTM - TransSubmitted TransStatus = "submitted" - // TransAborting 正在回滚中,有两种情况会出现,一是用户侧发起abort请求,而是发起submit同步请求,但是dtm进行回滚中出现错误 - TransAborting TransStatus = "aborting" - // TransSucceed 事务已完成 - TransSucceed TransStatus = "succeed" - // TransFailed 事务已回滚 - TransFailed TransStatus = "failed" - // TransErrorPrepare prepare调用报错 - TransErrorPrepare TransStatus = "error_parepare" - // TransErrorSubmit submit调用报错 - TransErrorSubmit TransStatus = "error_submit" - // TransErrorAbort abort调用报错 - TransErrorAbort TransStatus = "error_abort" -) - // TransOptions 提交/终止事务的选项 type TransOptions struct { // WaitResult 是否等待全局事务的最终结果 @@ -118,28 +70,26 @@ type TransOptions struct { // TransResult dtm 返回的结果 type TransResult struct { DtmResult string `json:"dtm_result"` - Status TransStatus Message string } -func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { +// CallDtm 调用dtm服务器,返回事务的状态 +func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) error { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "wait_result": common.If(opt.WaitResult, "1", "").(string), }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) - errStatus := TransStatus("error_" + operation) if err != nil { - return errStatus, err + return err } tr := resp.Result().(*TransResult) if tr.DtmResult == "FAILURE" { - return errStatus, errors.New(tr.Message) + return errors.New("FAILURE: " + tr.Message) } - return tr.Status, nil + return nil } func callDtmSimple(dtm string, body interface{}, operation string) error { - _, err := CallDtm(dtm, body, operation, &TransOptions{}) - return err + return CallDtm(dtm, body, operation, &TransOptions{}) } // ErrFailure 表示返回失败,要求回滚 diff --git a/dtmcli/types_test.go b/dtmcli/types_test.go index d47e232..97a3d88 100644 --- a/dtmcli/types_test.go +++ b/dtmcli/types_test.go @@ -1,7 +1,6 @@ package dtmcli import ( - "fmt" "net/url" "testing" @@ -22,8 +21,4 @@ func TestTypes(t *testing.T) { assert.Error(t, err) _, err = TransInfoFromQuery(url.Values{}) assert.Error(t, err) - - err2 := fmt.Errorf("an error") - err3 := CheckDtmResponse(nil, err2) - assert.Error(t, err2, err3) } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 328188f..cac6384 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -99,18 +99,18 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - _, rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) return } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status TransStatus, rerr error) { +func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - status, rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) + rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) if rerr != nil { return } @@ -118,9 +118,8 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() - operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) - var err error - status, err = CallDtm(xc.Server, data, operation, &TransOptions{}) + operation := common.If(x != nil || rerr != nil, "abort", "submit").(string) + err := CallDtm(xc.Server, data, operation, &TransOptions{}) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } @@ -129,6 +128,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status } }() resp, rerr = xaFunc(&xa) + rerr = CheckResponse(resp, rerr) return } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index b29c089..f04fdc8 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -24,19 +24,19 @@ func TestTccBarrier(t *testing.T) { func tccBarrierRollback(t *testing.T) { gid := "tccBarrierRollback" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - assert.True(t, !dtmcli.IsFailure(resp, err)) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - assert.True(t, dtmcli.IsFailure(resp, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, "failed", getTransStatus(gid)) } func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") @@ -50,7 +50,7 @@ func tccBarrierDisorder(t *testing.T) { timeoutChan := make(chan string, 2) finishedChan := make(chan string, 2) gid := "tccBarrierDisorder" - _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { body := &examples.TransReq{Amount: 30} tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" @@ -69,7 +69,7 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - _, err := dtmcli.CallDtm(tcc.Dtm, M{ + err := dtmcli.CallDtm(tcc.Dtm, M{ "gid": tcc.Gid, "branch_id": branchID, "trans_type": "tcc", diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index 1a07979..d5383df 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -19,7 +19,7 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, err) return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") @@ -30,7 +30,7 @@ func tccNormal(t *testing.T) { func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, rerr) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index c848d1c..5dee503 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -21,7 +21,7 @@ func TestXa(t *testing.T) { func xaLocalError(t *testing.T) { xc := examples.XaClient - _, err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { return nil, fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) @@ -30,7 +30,7 @@ func xaLocalError(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient gid := "xaNormal" - _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if err != nil { @@ -46,7 +46,7 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback" - _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if err != nil { diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 97c49dc..7e483bd 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -21,14 +21,14 @@ func TccSetup(app *gin.Engine) { // TccFireRequestNested 1 func TccFireRequestNested() string { gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } @@ -36,13 +36,13 @@ func TccFireRequestNested() string { func TccFireRequest() string { logrus.Printf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index e494f28..1f06d6e 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -15,14 +15,14 @@ import ( func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } diff --git a/examples/main_xa.go b/examples/main_xa.go index b07ae83..b3e4a75 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -28,7 +28,7 @@ func XaSetup(app *gin.Engine) { // XaFireRequest 注册全局XA事务,调用XA的分支 func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) - _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if err != nil { return resp, err From 864ef5b4f7f4f6acde3324b1ec458e7ef326c993 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 16:59:03 +0800 Subject: [PATCH 10/13] CallDtm move to TransBase --- dtmcli/message.go | 17 +++++++--------- dtmcli/saga.go | 15 ++++++--------- dtmcli/tcc.go | 18 ++++++++--------- dtmcli/types.go | 33 +++++++++++++++++++------------- dtmcli/xa.go | 16 +++++++--------- dtmsvr/trans_tcc_barrier_test.go | 4 ++-- 6 files changed, 50 insertions(+), 53 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 80a044d..02df337 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -8,7 +8,7 @@ import ( // Msg reliable msg type type Msg struct { MsgData - Server string + TransBase } // MsgData msg data @@ -32,13 +32,15 @@ func NewMsg(server string, gid string) *Msg { Gid: gid, TransType: "msg", }, - Server: server, + TransBase: TransBase{ + Dtm: server, + }, } } // Add add a new step func (s *Msg) Add(action string, postData interface{}) *Msg { - logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) + logrus.Printf("msg %s Add %s %v", s.MsgData.Gid, action, postData) step := MsgStep{ Action: action, Data: common.MustMarshalString(postData), @@ -50,15 +52,10 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - return callDtmSimple(s.Server, &s.MsgData, "prepare") + return s.CallDtm(&s.MsgData, "prepare") } // Submit submit the msg func (s *Msg) Submit() error { - return callDtmSimple(s.Server, &s.MsgData, "submit") -} - -// SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Msg) SubmitExt(opt *TransOptions) error { - return CallDtm(s.Server, &s.MsgData, "submit", opt) + return s.CallDtm(&s.MsgData, "submit") } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index a7f39ee..b32a340 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -8,7 +8,7 @@ import ( // Saga struct of saga type Saga struct { SagaData - Server string + TransBase } // SagaData sage data @@ -32,13 +32,15 @@ func NewSaga(server string, gid string) *Saga { Gid: gid, TransType: "saga", }, - Server: server, + TransBase: TransBase{ + Dtm: server, + }, } } // Add add a saga step func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { - logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) + logrus.Printf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData) step := SagaStep{ Action: action, Compensate: compensate, @@ -50,10 +52,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - return s.SubmitExt(&TransOptions{}) -} - -// SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Saga) SubmitExt(opt *TransOptions) error { - return CallDtm(s.Server, &s.SagaData, "submit", opt) + return s.CallDtm(&s.SagaData, "submit") } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index c2f019d..9ca9cfb 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -10,9 +10,8 @@ import ( // Tcc struct of tcc type Tcc struct { - IDGenerator - Dtm string Gid string + TransBase } // TccGlobalFunc type of global tcc call @@ -27,8 +26,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e "gid": gid, "trans_type": "tcc", } - tcc := &Tcc{Dtm: dtm, Gid: gid} - rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) + tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, Gid: gid} + rerr = tcc.CallDtm(data, "prepare") if rerr != nil { return rerr } @@ -37,7 +36,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e x := recover() var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - err = CallDtm(dtm, data, operation, &TransOptions{}) + err = tcc.CallDtm(data, operation) if rerr == nil { rerr = err } @@ -53,9 +52,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e // TccFromReq tcc from request info func TccFromReq(c *gin.Context) (*Tcc, error) { tcc := &Tcc{ - Dtm: c.Query("dtm"), - Gid: c.Query("gid"), - IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, + TransBase: *TransBaseFromReq(c), + Gid: c.Query("gid"), } if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid) @@ -67,7 +65,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - err := CallDtm(t.Dtm, &M{ + err := t.CallDtm(&M{ "gid": t.Gid, "branch_id": branchID, "trans_type": "tcc", @@ -76,7 +74,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, - }, "registerTccBranch", &TransOptions{}) + }, "registerTccBranch") if err != nil { return nil, err } diff --git a/dtmcli/types.go b/dtmcli/types.go index 92e97fc..86a148f 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" ) @@ -61,23 +62,33 @@ func (g *IDGenerator) NewBranchID() string { return g.parentID + fmt.Sprintf("%02d", g.branchID) } -// TransOptions 提交/终止事务的选项 -type TransOptions struct { - // WaitResult 是否等待全局事务的最终结果 - WaitResult bool -} - // TransResult dtm 返回的结果 type TransResult struct { DtmResult string `json:"dtm_result"` Message string } +// TransBase 事务的基础类 +type TransBase struct { + IDGenerator + Dtm string + // WaitResult 是否等待全局事务的最终结果 + WaitResult bool +} + +// TransBaseFromReq construct xa info from request +func TransBaseFromReq(c *gin.Context) *TransBase { + return &TransBase{ + IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, + Dtm: c.Query("dtm"), + } +} + // CallDtm 调用dtm服务器,返回事务的状态 -func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) error { +func (tb *TransBase) CallDtm(body interface{}, operation string) error { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ - "wait_result": common.If(opt.WaitResult, "1", "").(string), - }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) + "wait_result": common.If(tb.WaitResult, "1", "").(string), + }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { return err } @@ -88,10 +99,6 @@ func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) return nil } -func callDtmSimple(dtm string, body interface{}, operation string) error { - return CallDtm(dtm, body, operation, &TransOptions{}) -} - // ErrFailure 表示返回失败,要求回滚 var ErrFailure = errors.New("transaction FAILURE") diff --git a/dtmcli/xa.go b/dtmcli/xa.go index cac6384..860f093 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -33,16 +33,13 @@ type XaClient struct { // Xa xa transaction type Xa struct { - IDGenerator Gid string + TransBase } // XaFromReq construct xa info from request func XaFromReq(c *gin.Context) *Xa { - return &Xa{ - Gid: c.Query("gid"), - IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, - } + return &Xa{TransBase: *TransBaseFromReq(c), Gid: c.Query("gid")} } // NewXaClient construct a xa client @@ -73,6 +70,7 @@ func (xc *XaClient) HandleCallback(gid string, branchID string, action string) ( // XaLocalTransaction start a xa local transaction func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) { xa := XaFromReq(c) + xa.Dtm = xc.Server branchID := xa.NewBranchID() xaBranch := xa.Gid + "-" + branchID db := common.SdbAlone(xc.Conf) @@ -99,18 +97,18 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch") return } // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { - xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} + xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) + rerr = xa.CallDtm(data, "prepare") if rerr != nil { return } @@ -119,7 +117,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e defer func() { x := recover() operation := common.If(x != nil || rerr != nil, "abort", "submit").(string) - err := CallDtm(xc.Server, data, operation, &TransOptions{}) + err := xa.CallDtm(data, operation) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index f04fdc8..e1eb9d5 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -69,7 +69,7 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - err := dtmcli.CallDtm(tcc.Dtm, M{ + err := tcc.CallDtm(M{ "gid": tcc.Gid, "branch_id": branchID, "trans_type": "tcc", @@ -78,7 +78,7 @@ func tccBarrierDisorder(t *testing.T) { "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, - }, "registerTccBranch", &dtmcli.TransOptions{}) + }, "registerTccBranch") assert.Nil(t, err) go func() { logrus.Printf("sleeping to wait for tcc try timeout") From 0df21c7c705bf42243382a7e9f20f446ce9b107e Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 17:05:35 +0800 Subject: [PATCH 11/13] update tcc error check --- dtmcli/tcc.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 9ca9cfb..ced6910 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -34,9 +34,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() - var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - err = tcc.CallDtm(data, operation) + err := tcc.CallDtm(data, operation) if rerr == nil { rerr = err } From 3a9aaa316697c92b5ccf79bd49669c398031c637 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 18:10:43 +0800 Subject: [PATCH 12/13] wait_result may ok --- dtmsvr/api.go | 6 ++---- dtmsvr/cron.go | 12 ++++++++---- dtmsvr/trans.go | 22 ++++++++++++++++++++-- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index e614354..7c9af86 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -41,8 +41,7 @@ func submit(c *gin.Context) (interface{}, error) { } t.Status = "submitted" t.saveNew(db) - go t.Process(db) - return dtmcli.ResultSuccess, nil + return t.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil } func abort(c *gin.Context) (interface{}, error) { @@ -52,8 +51,7 @@ func abort(c *gin.Context) (interface{}, error) { if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil } - go dbt.Process(db) - return dtmcli.ResultSuccess, nil + return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil } func registerXaBranch(c *gin.Context) (interface{}, error) { diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 64a427d..a851918 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -1,6 +1,7 @@ package dtmsvr import ( + "fmt" "math" "math/rand" "runtime/debug" @@ -12,7 +13,7 @@ import ( // CronTransOnce cron expired trans. use expireIn as expire time func CronTransOnce(expireIn time.Duration) bool { - defer handlePanic() + defer handlePanic(nil) trans := lockOneTrans(expireIn) if trans == nil { return false @@ -20,7 +21,7 @@ func CronTransOnce(expireIn time.Duration) bool { if TransProcessedTestChan != nil { defer WaitTransProcessed(trans.Gid) } - trans.Process(dbGet()) + trans.Process(dbGet(), true) return true } @@ -52,9 +53,12 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { return &trans } -func handlePanic() { +func handlePanic(perr *error) { if err := recover(); err != nil { - common.RedLogf("----panic %s handlered\n%s", err.(error).Error(), string(debug.Stack())) + common.RedLogf("----panic %v handlered\n%s", err, string(debug.Stack())) + if perr != nil { + *perr = fmt.Errorf("dtm panic: %v", err) + } } } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 5d2beee..d2166a5 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -110,8 +111,24 @@ func (t *TransGlobal) getProcessor() transProcessor { } // Process process global transaction once -func (t *TransGlobal) Process(db *common.DB) { - defer handlePanic() +func (t *TransGlobal) Process(db *common.DB, waitResult bool) common.M { + if !waitResult { + go t.processInner(db) + return dtmcli.ResultSuccess + } + submitting := t.Status == "submitted" + err := t.processInner(db) + if err != nil { + return common.M{"dtm_result": "FAILURE", "message": err.Error()} + } + if submitting && t.Status != "succeed" { + return common.M{"dtm_result": "FAILURE", "message": "trans failed by user"} + } + return dtmcli.ResultSuccess +} + +func (t *TransGlobal) processInner(db *common.DB) (rerr error) { + defer handlePanic(&rerr) defer func() { if TransProcessedTestChan != nil { logrus.Printf("processed: %s", t.Gid) @@ -126,6 +143,7 @@ func (t *TransGlobal) Process(db *common.DB) { branches := []TransBranch{} db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) t.getProcessor().ProcessOnce(db, branches) + return } func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS { From a170b4dd31f258c760d80a50655c6de58e34512d Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 4 Aug 2021 10:24:20 +0800 Subject: [PATCH 13/13] add wait test --- dtmcli/types.go | 9 ++++-- dtmsvr/trans_saga_wait_test.go | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 dtmsvr/trans_saga_wait_test.go diff --git a/dtmcli/types.go b/dtmcli/types.go index 86a148f..0aff9fa 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -86,9 +86,12 @@ func TransBaseFromReq(c *gin.Context) *TransBase { // CallDtm 调用dtm服务器,返回事务的状态 func (tb *TransBase) CallDtm(body interface{}, operation string) error { - resp, err := common.RestyClient.R().SetQueryParams(common.MS{ - "wait_result": common.If(tb.WaitResult, "1", "").(string), - }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) + params := common.MS{} + if tb.WaitResult { + params["wait_result"] = "1" + } + resp, err := common.RestyClient.R().SetQueryParams(params). + SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { return err } diff --git a/dtmsvr/trans_saga_wait_test.go b/dtmsvr/trans_saga_wait_test.go new file mode 100644 index 0000000..4472ba6 --- /dev/null +++ b/dtmsvr/trans_saga_wait_test.go @@ -0,0 +1,50 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/examples" +) + +func TestSagaWait(t *testing.T) { + + sagaNormalWait(t) + sagaCommittedPendingWait(t) + sagaRollbackWait(t) +} + +func sagaNormalWait(t *testing.T) { + saga := genSaga("gid-noramlSagaWait", false, false) + saga.WaitResult = true + err := saga.Submit() + assert.Nil(t, err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) + transQuery(t, saga.Gid) +} + +func sagaCommittedPendingWait(t *testing.T) { + saga := genSaga("gid-committedPendingWait", false, false) + examples.MainSwitch.TransOutResult.SetOnce("PENDING") + saga.WaitResult = true + err := saga.Submit() + assert.Error(t, err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) +} + +func sagaRollbackWait(t *testing.T) { + saga := genSaga("gid-rollbackSaga2Wait", false, true) + saga.WaitResult = true + err := saga.Submit() + assert.Error(t, err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) + assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) +}