From 94e8e4eff913979a0bd302c70f1181749cfbe4f2 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sun, 1 Aug 2021 11:10:16 +0800 Subject: [PATCH] tcc interface seems ok --- common/types.go | 4 ++-- common/utils.go | 5 +++++ dtmcli/tcc.go | 35 +++++++++++------------------ dtmcli/types.go | 15 +++++++++++++ dtmsvr/cron.go | 3 ++- dtmsvr/trans_tcc_barrier_test.go | 38 +++++++++++--------------------- dtmsvr/trans_tcc_test.go | 24 +++++++++----------- examples/main_tcc.go | 33 +++++++++++++-------------- examples/main_tcc_barrier.go | 15 ++++++------- examples/main_xa.go | 10 ++++----- examples/types.go | 13 +++++++---- 11 files changed, 96 insertions(+), 99 deletions(-) diff --git a/common/types.go b/common/types.go index b0fbc49..a2eaec2 100644 --- a/common/types.go +++ b/common/types.go @@ -162,7 +162,7 @@ func SdbExec(db *sql.DB, sql string, values ...interface{}) (affected int64, rer affected, rerr = r.RowsAffected() logrus.Printf("affected: %d for %s %v", affected, sql, values) } else { - logrus.Printf("\x1b[31m\nexec error: %v for %s %v\x1b[0m\n", rerr, sql, values) + RedLogf("exec error: %v for %s %v", rerr, sql, values) } return } @@ -174,7 +174,7 @@ func StxExec(tx *sql.Tx, sql string, values ...interface{}) (affected int64, rer affected, rerr = r.RowsAffected() logrus.Printf("affected: %d for %s %v", affected, sql, values) } else { - logrus.Printf("\x1b[31m\nexec error: %v for %s %v\x1b[0m\n", rerr, sql, values) + RedLogf("exec error: %v for %s %v", rerr, sql, values) } return } diff --git a/common/utils.go b/common/utils.go index 1389eea..3e5aa7b 100644 --- a/common/utils.go +++ b/common/utils.go @@ -205,6 +205,11 @@ func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) { return b.Bytes(), nil } +// RedLogf 采用红色打印错误类信息 +func RedLogf(fmt string, args ...interface{}) { + logrus.Errorf("\x1b[31m\n"+fmt+"\x1b[0m\n", args...) +} + // InitConfig init config func InitConfig(dir string, config interface{}) { logrus.SetFormatter(&formatter{}) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index c7e1f83..9bceaea 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -2,11 +2,9 @@ package dtmcli import ( "fmt" - "strings" "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -18,41 +16,38 @@ type Tcc struct { } // TccGlobalFunc type of global tcc call -type TccGlobalFunc func(tcc *Tcc) error +type TccGlobalFunc func(tcc *Tcc) (interface{}, error) // TccGlobalTransaction begin a tcc global transaction // dtm dtm服务器地址 // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 -func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret interface{}, rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } + tcc := &Tcc{Dtm: dtm, Gid: gid} + resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") + if IsFailure(resp, err) { + return resp, err + } + // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { - var resp *resty.Response - var err error var x interface{} - if x = recover(); x != nil || rerr != nil { + if x = recover(); x != nil || IsFailure(ret, rerr) { resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") } else { resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") } - err2 := CheckDtmResponse(resp, err) - if err2 != nil { - logrus.Errorf("submitting or abort global transaction error: %v", err2) + if IsFailure(resp, err) { + common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String()) } if x != nil { panic(x) } }() - tcc := &Tcc{Dtm: dtm, Gid: gid} - resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") - rerr = CheckDtmResponse(resp, err) - if rerr != nil { - return - } - rerr = tccFunc(tcc) + ret, rerr = tccFunc(tcc) return } @@ -85,8 +80,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "cancel": cancelURL, }). Post(t.Dtm + "/registerTccBranch") - err = CheckDtmResponse(resp, err) - if err != nil { + if IsFailure(resp, err) { return resp, err } resp, err = common.RestyClient.R(). @@ -99,8 +93,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "branch_type": "try", }). Post(tryURL) - if err == nil && strings.Contains(resp.String(), "FAILURE") { - err = fmt.Errorf("branch return failure: %s", resp.String()) - } return resp, err } diff --git a/dtmcli/types.go b/dtmcli/types.go index e443fca..7679a96 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -18,6 +18,21 @@ func MustGenGid(server string) string { return res["gid"] } +// IsFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么返回true。此时认为业务调用失败 +func IsFailure(res interface{}, err error) bool { + resp, ok := res.(*resty.Response) + return err != nil || ok && strings.Contains(resp.String(), "FAILURE") +} + +// PanicIfFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么Panic。此时认为业务调用失败 +func PanicIfFailure(res interface{}, err error) { + resp, ok := res.(*resty.Response) + failure := err != nil || ok && strings.Contains(resp.String(), "FAILURE") + if failure { + panic(fmt.Errorf("dtm failure ret: %v err %v", res, err)) + } +} + // CheckDtmResponse check the response of dtm, if not ok ,generate error func CheckDtmResponse(resp *resty.Response, err error) error { if err != nil { diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 3b18ce9..64a427d 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -7,6 +7,7 @@ import ( "time" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" ) // CronTransOnce cron expired trans. use expireIn as expire time @@ -53,7 +54,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { func handlePanic() { if err := recover(); err != nil { - logrus.Errorf("\x1b[31m\n----panic %s handlered\x1b[0m\n%s", err.(error).Error(), string(debug.Stack())) + common.RedLogf("----panic %s handlered\n%s", err.(error).Error(), string(debug.Stack())) } } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index 8d1a3bd..5375c86 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -23,35 +23,24 @@ func TestTccBarrier(t *testing.T) { func tccBarrierRollback(t *testing.T) { gid := "tccBarrierRollback" - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - assert.Contains(t, res1.String(), "SUCCESS") - _, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - assert.Error(t, rerr) - return + resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.True(t, !dtmcli.IsFailure(resp, err)) + return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - assert.Error(t, err) + assert.True(t, dtmcli.IsFailure(resp, err)) WaitTransProcessed(gid) assert.Equal(t, "failed", getTransStatus(gid)) } func tccBarrierNormal(t *testing.T) { gid := "tccBarrierNormal" - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return + resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.True(t, !dtmcli.IsFailure(resp, err)) + return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - e2p(err) + assert.True(t, !dtmcli.IsFailure(resp, err)) WaitTransProcessed(gid) assert.Equal(t, "succeed", getTransStatus(gid)) } @@ -60,7 +49,7 @@ func tccBarrierDisorder(t *testing.T) { timeoutChan := make(chan string, 2) finishedChan := make(chan string, 2) gid := "tccBarrierDisorder" - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { body := &examples.TransReq{Amount: 30} tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" @@ -91,8 +80,7 @@ func tccBarrierDisorder(t *testing.T) { "cancel": cancelURL, }). Post(tcc.Dtm + "/registerTccBranch") - e2p(err) - assert.True(t, strings.Contains(r.String(), "SUCCESS")) + assert.True(t, !dtmcli.IsFailure(r, err)) go func() { logrus.Printf("sleeping to wait for tcc try timeout") <-timeoutChan @@ -119,7 +107,7 @@ func tccBarrierDisorder(t *testing.T) { <-finishedChan <-finishedChan time.Sleep(100 * time.Millisecond) - return fmt.Errorf("a cancelled tcc") + return nil, fmt.Errorf("a cancelled tcc") }) assert.Error(t, err, fmt.Errorf("a cancelled tcc")) assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid)) diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index 8383b05..1a6666f 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -18,28 +18,26 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} gid := "tccNormal" - err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - return + ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + if dtmcli.IsFailure(resp, err) { + return resp, err + } + return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - e2p(err) + dtmcli.PanicIfFailure(ret, err) } func tccRollback(t *testing.T) { gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - assert.Contains(t, resp.String(), "SUCCESS") + assert.True(t, !dtmcli.IsFailure(resp, rerr)) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - assert.Error(t, rerr) - return + return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - assert.Error(t, err) + assert.True(t, dtmcli.IsFailure(resp, err)) WaitTransProcessed(gid) assert.Equal(t, "aborting", getTransStatus(gid)) CronTransOnce(60 * time.Second) diff --git a/examples/main_tcc.go b/examples/main_tcc.go index dd0dc79..40f5a74 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -12,9 +12,8 @@ func TccSetup(app *gin.Engine) { app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromReq(c) e2p(err) - req := reqFrom(c) logrus.Printf("TransInTccParent ") - _, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + _, rerr := tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") e2p(rerr) return M{"dtm_result": "SUCCESS"}, nil })) @@ -22,17 +21,15 @@ func TccSetup(app *gin.Engine) { // TccFireRequestNested 1 func TccFireRequestNested() string { - logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + if dtmcli.IsFailure(resp, err) { + return resp, err + } + return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - e2p(err) + dtmcli.PanicIfFailure(ret, err) return gid } @@ -40,13 +37,13 @@ func TccFireRequestNested() string { func TccFireRequest() string { logrus.Printf("tcc simple transaction begin") gid := dtmcli.MustGenGid(DtmServer) - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + if dtmcli.IsFailure(resp, err) { + return resp, err + } + return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") }) - e2p(err) + dtmcli.PanicIfFailure(ret, err) return gid } diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index ebb6c45..c7d973e 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -14,15 +14,14 @@ import ( func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) - err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - common.CheckRestySuccess(res1, rerr) - res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - common.CheckRestySuccess(res1, rerr) - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return + ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) { + resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + if dtmcli.IsFailure(resp, err) { + return resp, err + } + return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") }) - e2p(err) + dtmcli.PanicIfFailure(ret, err) return gid } diff --git a/examples/main_xa.go b/examples/main_xa.go index 2f1b5b7..a7919d7 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -40,11 +40,10 @@ func XaFireRequest() string { func xaTransIn(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (rerr error) { - req := reqFrom(c) - if req.TransInResult == "FAILURE" { + if reqFrom(c).TransInResult == "FAILURE" { return fmt.Errorf("tranIn FAILURE") } - _, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", req.Amount, 2) + _, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2) return }) if err != nil && strings.Contains(err.Error(), "FAILURE") { @@ -56,11 +55,10 @@ func xaTransIn(c *gin.Context) (interface{}, error) { func xaTransOut(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (rerr error) { - req := reqFrom(c) - if req.TransOutResult == "FAILURE" { + if reqFrom(c).TransOutResult == "FAILURE" { return fmt.Errorf("tranOut failed") } - _, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", req.Amount, 1) + _, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1) return }) e2p(err) diff --git a/examples/types.go b/examples/types.go index 0e0621b..f5c8286 100644 --- a/examples/types.go +++ b/examples/types.go @@ -38,10 +38,15 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq { } func reqFrom(c *gin.Context) *TransReq { - req := TransReq{} - err := c.BindJSON(&req) - e2p(err) - return &req + v, ok := c.Get("trans_req") + if !ok { + req := TransReq{} + err := c.BindJSON(&req) + e2p(err) + c.Set("trans_req", &req) + v = &req + } + return v.(*TransReq) } func infoFromContext(c *gin.Context) *dtmcli.TransInfo {