From b20d18ecbbf446bf1f224fac891c70e5c3670fc8 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 16:19:31 +0800 Subject: [PATCH] clear failure --- dtmcli/message.go | 2 +- dtmcli/saga.go | 5 ++- dtmcli/tcc.go | 10 +++--- dtmcli/types.go | 62 ++++---------------------------- dtmcli/types_test.go | 5 --- dtmcli/xa.go | 12 +++---- dtmsvr/trans_tcc_barrier_test.go | 14 ++++---- dtmsvr/trans_tcc_test.go | 4 +-- dtmsvr/trans_xa_test.go | 6 ++-- examples/main_tcc.go | 12 +++---- examples/main_tcc_barrier.go | 6 ++-- examples/main_xa.go | 2 +- 12 files changed, 42 insertions(+), 98 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 9104862..80a044d 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -59,6 +59,6 @@ func (s *Msg) Submit() error { } // SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { +func (s *Msg) SubmitExt(opt *TransOptions) error { return CallDtm(s.Server, &s.MsgData, "submit", opt) } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index 855b5ef..a7f39ee 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -50,11 +50,10 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - _, err := s.SubmitExt(&TransOptions{}) - return err + return s.SubmitExt(&TransOptions{}) } // SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { +func (s *Saga) SubmitExt(opt *TransOptions) error { return CallDtm(s.Server, &s.SagaData, "submit", opt) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 86c76b3..c2f019d 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -22,22 +22,22 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // dtm dtm服务器地址 // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 -func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (status TransStatus, rerr error) { +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } tcc := &Tcc{Dtm: dtm, Gid: gid} - status, rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) + rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) if rerr != nil { - return status, rerr + return rerr } // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - status, err = CallDtm(dtm, data, operation, &TransOptions{}) + err = CallDtm(dtm, data, operation, &TransOptions{}) if rerr == nil { rerr = err } @@ -67,7 +67,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - _, err := CallDtm(t.Dtm, &M{ + err := CallDtm(t.Dtm, &M{ "gid": t.Gid, "branch_id": branchID, "trans_type": "tcc", diff --git a/dtmcli/types.go b/dtmcli/types.go index d10e112..92e97fc 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -19,21 +19,6 @@ func MustGenGid(server string) string { return res["gid"] } -// IsFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么返回true。此时认为业务调用失败 -func IsFailure(res interface{}, err error) bool { - resp, ok := res.(*resty.Response) - return err != nil || // 包含错误 - ok && (resp.IsError() || strings.Contains(resp.String(), "FAILURE")) || // resp包含failure - !ok && res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") // 结果中包含failure -} - -// PanicIfFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么Panic。此时认为业务调用失败 -func PanicIfFailure(res interface{}, err error) { - if IsFailure(res, err) { - panic(fmt.Errorf("dtm failure ret: %v err %v", res, err)) - } -} - // CheckResponse 检查Response,返回错误 func CheckResponse(resp *resty.Response, err error) error { if err == nil && resp != nil { @@ -58,17 +43,6 @@ func CheckResult(res interface{}, err error) error { return err } -// CheckDtmResponse check the response of dtm, if not ok ,generate error -func CheckDtmResponse(resp *resty.Response, err error) error { - if err != nil { - return err - } - if !strings.Contains(resp.String(), "SUCCESS") || resp.IsError() { - return fmt.Errorf("dtm response failed: %s", resp.String()) - } - return nil -} - // IDGenerator used to generate a branch id type IDGenerator struct { parentID string @@ -87,28 +61,6 @@ func (g *IDGenerator) NewBranchID() string { return g.parentID + fmt.Sprintf("%02d", g.branchID) } -// TransStatus 全局事务状态,采用string -type TransStatus string - -const ( - // TransEmpty 空值 - TransEmpty TransStatus = "" - // TransSubmitted 已提交给DTM - TransSubmitted TransStatus = "submitted" - // TransAborting 正在回滚中,有两种情况会出现,一是用户侧发起abort请求,而是发起submit同步请求,但是dtm进行回滚中出现错误 - TransAborting TransStatus = "aborting" - // TransSucceed 事务已完成 - TransSucceed TransStatus = "succeed" - // TransFailed 事务已回滚 - TransFailed TransStatus = "failed" - // TransErrorPrepare prepare调用报错 - TransErrorPrepare TransStatus = "error_parepare" - // TransErrorSubmit submit调用报错 - TransErrorSubmit TransStatus = "error_submit" - // TransErrorAbort abort调用报错 - TransErrorAbort TransStatus = "error_abort" -) - // TransOptions 提交/终止事务的选项 type TransOptions struct { // WaitResult 是否等待全局事务的最终结果 @@ -118,28 +70,26 @@ type TransOptions struct { // TransResult dtm 返回的结果 type TransResult struct { DtmResult string `json:"dtm_result"` - Status TransStatus Message string } -func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { +// CallDtm 调用dtm服务器,返回事务的状态 +func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) error { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "wait_result": common.If(opt.WaitResult, "1", "").(string), }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) - errStatus := TransStatus("error_" + operation) if err != nil { - return errStatus, err + return err } tr := resp.Result().(*TransResult) if tr.DtmResult == "FAILURE" { - return errStatus, errors.New(tr.Message) + return errors.New("FAILURE: " + tr.Message) } - return tr.Status, nil + return nil } func callDtmSimple(dtm string, body interface{}, operation string) error { - _, err := CallDtm(dtm, body, operation, &TransOptions{}) - return err + return CallDtm(dtm, body, operation, &TransOptions{}) } // ErrFailure 表示返回失败,要求回滚 diff --git a/dtmcli/types_test.go b/dtmcli/types_test.go index d47e232..97a3d88 100644 --- a/dtmcli/types_test.go +++ b/dtmcli/types_test.go @@ -1,7 +1,6 @@ package dtmcli import ( - "fmt" "net/url" "testing" @@ -22,8 +21,4 @@ func TestTypes(t *testing.T) { assert.Error(t, err) _, err = TransInfoFromQuery(url.Values{}) assert.Error(t, err) - - err2 := fmt.Errorf("an error") - err3 := CheckDtmResponse(nil, err2) - assert.Error(t, err2, err3) } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 328188f..cac6384 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -99,18 +99,18 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - _, rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) return } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status TransStatus, rerr error) { +func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - status, rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) + rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) if rerr != nil { return } @@ -118,9 +118,8 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() - operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) - var err error - status, err = CallDtm(xc.Server, data, operation, &TransOptions{}) + operation := common.If(x != nil || rerr != nil, "abort", "submit").(string) + err := CallDtm(xc.Server, data, operation, &TransOptions{}) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } @@ -129,6 +128,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status } }() resp, rerr = xaFunc(&xa) + rerr = CheckResponse(resp, rerr) return } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index b29c089..f04fdc8 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -24,19 +24,19 @@ func TestTccBarrier(t *testing.T) { func tccBarrierRollback(t *testing.T) { gid := "tccBarrierRollback" - resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { - resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - assert.True(t, !dtmcli.IsFailure(resp, err)) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - assert.True(t, dtmcli.IsFailure(resp, err)) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, "failed", getTransStatus(gid)) } func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") assert.Nil(t, err) return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") @@ -50,7 +50,7 @@ func tccBarrierDisorder(t *testing.T) { timeoutChan := make(chan string, 2) finishedChan := make(chan string, 2) gid := "tccBarrierDisorder" - _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { body := &examples.TransReq{Amount: 30} tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" @@ -69,7 +69,7 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - _, err := dtmcli.CallDtm(tcc.Dtm, M{ + err := dtmcli.CallDtm(tcc.Dtm, M{ "gid": tcc.Gid, "branch_id": branchID, "trans_type": "tcc", diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index 1a07979..d5383df 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -19,7 +19,7 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, err) return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") @@ -30,7 +30,7 @@ func tccNormal(t *testing.T) { func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, rerr) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index c848d1c..5dee503 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -21,7 +21,7 @@ func TestXa(t *testing.T) { func xaLocalError(t *testing.T) { xc := examples.XaClient - _, err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { return nil, fmt.Errorf("an error") }) assert.Error(t, err, fmt.Errorf("an error")) @@ -30,7 +30,7 @@ func xaLocalError(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient gid := "xaNormal" - _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if err != nil { @@ -46,7 +46,7 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient gid := "xaRollback" - _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") if err != nil { diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 97c49dc..7e483bd 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -21,14 +21,14 @@ func TccSetup(app *gin.Engine) { // TccFireRequestNested 1 func TccFireRequestNested() string { gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } @@ -36,13 +36,13 @@ func TccFireRequestNested() string { func TccFireRequest() string { logrus.Printf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index e494f28..1f06d6e 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -15,14 +15,14 @@ import ( func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) - ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - if dtmcli.IsFailure(resp, err) { + if err != nil { return resp, err } return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - dtmcli.PanicIfFailure(ret, err) + e2p(err) return gid } diff --git a/examples/main_xa.go b/examples/main_xa.go index b07ae83..b3e4a75 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -28,7 +28,7 @@ func XaSetup(app *gin.Engine) { // XaFireRequest 注册全局XA事务,调用XA的分支 func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) - _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { + err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if err != nil { return resp, err