diff --git a/dtmcli/message.go b/dtmcli/message.go index 9fa680a..9104862 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -60,5 +60,5 @@ func (s *Msg) Submit() error { // SubmitExt 高级submit,更多的选项和更详细的返回值 func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { - return callDtm(s.Server, &s.MsgData, "submit", opt) + return CallDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index be2874f..855b5ef 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -56,5 +56,5 @@ func (s *Saga) Submit() error { // SubmitExt 高级submit,更多的选项和更详细的返回值 func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { - return callDtm(s.Server, &s.SagaData, "submit", opt) + return CallDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 6e93b6f..8c8293e 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -28,26 +28,24 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret in "trans_type": "tcc", } tcc := &Tcc{Dtm: dtm, Gid: gid} - resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") - if IsFailure(resp, err) { + resp, err := CallDtm(dtm, data, "prepare", &TransOptions{}) + if err != nil { return resp, err } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { - var x interface{} - if x = recover(); x != nil || IsFailure(ret, rerr) { - resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") - } else { - resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") - } - if IsFailure(resp, err) { - common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String()) + x := recover() + operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) + resp, err = CallDtm(dtm, data, operation, &TransOptions{}) + if rerr == nil { + rerr = err } if x != nil { panic(x) } }() ret, rerr = tccFunc(tcc) + rerr = CheckResult(ret, rerr) return } @@ -68,22 +66,20 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - resp, err := common.RestyClient.R(). - SetBody(&M{ - "gid": t.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(t.Dtm + "/registerTccBranch") - if IsFailure(resp, err) { - return resp, err + _, err := CallDtm(t.Dtm, &M{ + "gid": t.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }, "registerTccBranch", &TransOptions{}) + if err != nil { + return nil, err } - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": t.Dtm, @@ -93,4 +89,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "branch_type": "try", }). Post(tryURL) + return resp, CheckResponse(resp, err) } diff --git a/dtmcli/types.go b/dtmcli/types.go index b29acd2..d10e112 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -122,7 +122,7 @@ type TransResult struct { Message string } -func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { +func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "wait_result": common.If(opt.WaitResult, "1", "").(string), }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) @@ -138,7 +138,7 @@ func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) } func callDtmSimple(dtm string, body interface{}, operation string) error { - _, err := callDtm(dtm, body, operation, &TransOptions{}) + _, err := CallDtm(dtm, body, operation, &TransOptions{}) return err } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index f8520b2..328188f 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -99,7 +99,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - _, rerr = callDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + _, rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) return } @@ -110,7 +110,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status "gid": gid, "trans_type": "xa", } - status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{}) + status, rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) if rerr != nil { return } @@ -120,7 +120,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status x := recover() operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) var err error - status, err = callDtm(xc.Server, data, operation, &TransOptions{}) + status, err = CallDtm(xc.Server, data, operation, &TransOptions{}) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index 4b46d06..b29c089 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -36,12 +36,12 @@ func tccBarrierRollback(t *testing.T) { func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - assert.True(t, !dtmcli.IsFailure(resp, err)) + _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - assert.True(t, !dtmcli.IsFailure(resp, err)) + assert.Nil(t, err) WaitTransProcessed(gid) assert.Equal(t, "succeed", getTransStatus(gid)) } @@ -69,23 +69,21 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - r, err := common.RestyClient.R(). - SetBody(&M{ - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(tcc.Dtm + "/registerTccBranch") - assert.True(t, !dtmcli.IsFailure(r, err)) + _, err := dtmcli.CallDtm(tcc.Dtm, M{ + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }, "registerTccBranch", &dtmcli.TransOptions{}) + assert.Nil(t, err) go func() { logrus.Printf("sleeping to wait for tcc try timeout") <-timeoutChan - r, _ = common.RestyClient.R(). + r, _ := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": tcc.Dtm, diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index dc1833e..1a07979 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -19,26 +19,24 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { - return resp, err - } + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Nil(t, err) return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + assert.Nil(t, err) } func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - assert.True(t, !dtmcli.IsFailure(resp, rerr)) + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Nil(t, rerr) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - assert.True(t, dtmcli.IsFailure(resp, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, "aborting", getTransStatus(gid)) CronTransOnce(60 * time.Second)