From 1961ad806dd31d0d4ae58fc3e7bf95f739addfd3 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Fri, 23 Jul 2021 10:42:15 +0800 Subject: [PATCH] gid generation decoupled --- dtmcli/message.go | 7 +----- dtmcli/saga.go | 7 +----- dtmcli/tcc.go | 21 ++++++++++------- dtmcli/xa.go | 39 ++++++++++++++++--------------- dtmsvr/dtmsvr_test.go | 6 ++--- dtmsvr/trans_saga_barrier_test.go | 4 ++-- dtmsvr/trans_tcc_barrier_test.go | 29 ++++++++++------------- dtmsvr/trans_tcc_test.go | 15 +++++++----- dtmsvr/trans_xa_test.go | 6 +++-- examples/main_msg.go | 2 +- examples/main_saga.go | 2 +- examples/main_saga_barrier.go | 2 +- examples/main_tcc.go | 3 ++- examples/main_tcc_barrier.go | 4 +++- examples/main_xa.go | 3 ++- examples/quick_start.go | 2 +- 16 files changed, 74 insertions(+), 78 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index a86384a..d6baf84 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -30,12 +30,7 @@ type MsgStep struct { } // NewMsg create new msg -func NewMsg(server string) *Msg { - return NewMsg2(server, GenGid(server)) -} - -// NewMsg2 create new msg with specified gid -func NewMsg2(server string, gid string) *Msg { +func NewMsg(server string, gid string) *Msg { return &Msg{ MsgData: MsgData{ Gid: gid, diff --git a/dtmcli/saga.go b/dtmcli/saga.go index da2408c..20f4843 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -30,12 +30,7 @@ type SagaStep struct { } // NewSaga create a saga -func NewSaga(server string) *Saga { - return NewSaga2(server, GenGid(server)) -} - -// NewSaga2 create a saga -func NewSaga2(server string, gid string) *Saga { +func NewSaga(server string, gid string) *Saga { return &Saga{ SagaData: SagaData{ Gid: gid, diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 334a0a2..0fb00e0 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -21,20 +21,15 @@ type Tcc struct { type TccGlobalFunc func(tcc *Tcc) error // TccGlobalTransaction begin a tcc global transaction -func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) { - return TccGlobalTransaction2(dtm, GenGid(dtm), tccFunc) -} - -// TccGlobalTransaction2 begin a tcc global transaction -func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid string, rerr error) { - gid = gidIn +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } defer func() { var err error - if x := recover(); x != nil || rerr != nil { + var x interface{} + if x = recover(); x != nil || rerr != nil { _, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") } else { _, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") @@ -42,6 +37,9 @@ func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid if err != nil { logrus.Errorf("submitting or abort global transaction error: %v", err) } + if x != nil { + panic(x) + } }() tcc := &Tcc{Dtm: dtm, Gid: gid} resp, rerr := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") @@ -50,6 +48,7 @@ func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid } if !strings.Contains(resp.String(), "SUCCESS") { rerr = fmt.Errorf("bad response: %s", resp.String()) + return } rerr = tccFunc(tcc) return @@ -89,7 +88,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can if !strings.Contains(resp.String(), "SUCCESS") { return nil, fmt.Errorf("registerTccBranch failed: %s", resp.String()) } - return common.RestyClient.R(). + r, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": t.Dtm, @@ -99,4 +98,8 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "branch_type": "try", }). Post(tryURL) + if err == nil && strings.Contains(r.String(), "FAILURE") { + return r, fmt.Errorf("branch return failure: %s", r.String()) + } + return r, err } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index e43eac0..52c650f 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -115,14 +116,8 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { - return xc.XaGlobalTransaction2(GenGid(xc.Server), transFunc) -} - -// XaGlobalTransaction2 start a xa global transaction with gid=ginIn -func (xc *XaClient) XaGlobalTransaction2(gidIn string, transFunc XaGlobalFunc) (gid string, rerr error) { - xa := Xa{IDGenerator: IDGenerator{}, Gid: gidIn} - gid = xa.Gid +func (xc *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) error { + xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", @@ -130,29 +125,31 @@ func (xc *XaClient) XaGlobalTransaction2(gidIn string, transFunc XaGlobalFunc) ( defer func() { x := recover() if x != nil { - _, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") - rerr = x.(error) + r, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") + if !strings.Contains(r.String(), "SUCCESS") { + logrus.Errorf("abort xa error: resp: %s err: %v", r.String(), err) + } } }() resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") - e2p(rerr) if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) + return fmt.Errorf("unexpected result: %s", resp.String()) } rerr = transFunc(&xa) - e2p(rerr) - resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") - e2p(rerr) - if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) + if rerr != nil { + return rerr } - return + resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") + if !strings.Contains(resp.String(), "SUCCESS") { + return fmt.Errorf("unexpected result: %s err: %v", resp.String(), rerr) + } + return nil } // CallBranch call a xa branch func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { branchID := x.NewBranchID() - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "gid": x.Gid, @@ -161,4 +158,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) + if strings.Contains(resp.String(), "FAILURE") { + return resp, fmt.Errorf("unexpected result: %s err: %v", resp.String(), err) + } + return resp, err } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 2ca0329..10957d5 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -76,22 +76,20 @@ func assertSucceed(t *testing.T, gid string) { func genMsg(gid string) *dtmcli.Msg { logrus.Printf("beginning a msg test ---------------- %s", gid) - msg := dtmcli.NewMsg(examples.DtmServer) + msg := dtmcli.NewMsg(examples.DtmServer, gid) msg.QueryPrepared = examples.Busi + "/CanSubmit" req := examples.GenTransReq(30, false, false) msg.Add(examples.Busi+"/TransOut", &req) msg.Add(examples.Busi+"/TransIn", &req) - msg.Gid = gid return msg } func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtmcli.NewSaga(examples.DtmServer) + saga := dtmcli.NewSaga(examples.DtmServer, gid) req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) - saga.Gid = gid return saga } diff --git a/dtmsvr/trans_saga_barrier_test.go b/dtmsvr/trans_saga_barrier_test.go index 3239136..1749c4e 100644 --- a/dtmsvr/trans_saga_barrier_test.go +++ b/dtmsvr/trans_saga_barrier_test.go @@ -17,7 +17,7 @@ func TestSagaBarrier(t *testing.T) { func sagaBarrierNormal(t *testing.T) { req := &examples.TransReq{Amount: 30} - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, "sagaBarrierNormal"). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) logrus.Printf("busi trans submit") @@ -28,7 +28,7 @@ func sagaBarrierNormal(t *testing.T) { } func sagaBarrierRollback(t *testing.T) { - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback"). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) logrus.Printf("busi trans submit") diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index f4b12ac..8d1a3bd 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -22,30 +22,22 @@ func TestTccBarrier(t *testing.T) { } func tccBarrierRollback(t *testing.T) { - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := "tccBarrierRollback" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - if strings.Contains(res2.String(), "FAILURE") { - return fmt.Errorf("branch trans in fail") - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) + assert.Contains(t, res1.String(), "SUCCESS") + _, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + assert.Error(t, rerr) return }) - assert.Equal(t, err, fmt.Errorf("branch trans in fail")) + assert.Error(t, err) WaitTransProcessed(gid) assert.Equal(t, "failed", getTransStatus(gid)) } func tccBarrierNormal(t *testing.T) { - _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := "tccBarrierNormal" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") e2p(rerr) if res1.StatusCode() != 200 { @@ -60,12 +52,15 @@ func tccBarrierNormal(t *testing.T) { return }) e2p(err) + WaitTransProcessed(gid) + assert.Equal(t, "succeed", getTransStatus(gid)) } func tccBarrierDisorder(t *testing.T) { timeoutChan := make(chan string, 2) finishedChan := make(chan string, 2) - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := "tccBarrierDisorder" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { body := &examples.TransReq{Amount: 30} tryURL := Busi + "/TccBTransOutTry" confirmURL := Busi + "/TccBTransOutConfirm" diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go index 18eff79..741026c 100644 --- a/dtmsvr/trans_tcc_test.go +++ b/dtmsvr/trans_tcc_test.go @@ -3,6 +3,7 @@ package dtmsvr import ( "testing" + "github.com/stretchr/testify/assert" "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -15,7 +16,8 @@ func TestTcc(t *testing.T) { func tccNormal(t *testing.T) { data := &examples.TransReq{Amount: 30} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := "tccNormal" + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") e2p(rerr) _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") @@ -26,13 +28,14 @@ func tccNormal(t *testing.T) { } func tccRollback(t *testing.T) { + gid := "tccRollback" data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Contains(t, resp.String(), "SUCCESS") _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) + assert.Error(t, rerr) return }) - e2p(err) + assert.Error(t, err) } diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index f3b5636..7d8dd94 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -18,7 +18,8 @@ func TestXa(t *testing.T) { func xaNormal(t *testing.T) { xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { + gid := "xaNormal" + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error { req := examples.GenTransReq(30, false, false) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") common.CheckRestySuccess(resp, err) @@ -33,7 +34,8 @@ func xaNormal(t *testing.T) { func xaRollback(t *testing.T) { xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { + gid := "xaRollback" + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error { req := examples.GenTransReq(30, false, true) resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") common.CheckRestySuccess(resp, err) diff --git a/examples/main_msg.go b/examples/main_msg.go index ae635b3..9aaab48 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -18,7 +18,7 @@ func MsgFireRequest() string { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - msg := dtmcli.NewMsg(DtmServer). + msg := dtmcli.NewMsg(DtmServer, dtmcli.GenGid(DtmServer)). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/TransQuery") diff --git a/examples/main_saga.go b/examples/main_saga.go index b6fc8d8..b28723b 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -18,7 +18,7 @@ func SagaFireRequest() string { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) logrus.Printf("saga busi trans submit") diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 90e9725..9d4bd01 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -14,7 +14,7 @@ import ( func SagaBarrierFireRequest() string { logrus.Printf("a busi transaction begin") req := &TransReq{Amount: 30} - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) logrus.Printf("busi trans submit") diff --git a/examples/main_tcc.go b/examples/main_tcc.go index f13368f..6ac4b47 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -29,7 +29,8 @@ func TccSetup(app *gin.Engine) { // TccFireRequest 1 func TccFireRequest() string { logrus.Printf("tcc transaction begin") - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := dtmcli.GenGid(DtmServer) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") if rerr != nil { return diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index 22df1d5..d7c0390 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -13,7 +13,8 @@ import ( // TccBarrierFireRequest 1 func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := dtmcli.GenGid(DtmServer) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") if rerr != nil { return @@ -112,6 +113,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { }) } +// TccBarrierTransOutCancel will be use in test func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return adjustTrading(sdb, transOutUID, reqFrom(c).Amount) diff --git a/examples/main_xa.go b/examples/main_xa.go index 30e62a6..e9fc73d 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -38,7 +38,8 @@ func dbGet() *common.DB { // XaFireRequest 1 func XaFireRequest() string { - gid, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) { + gid := dtmcli.GenGid(DtmServer) + err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (rerr error) { defer common.P2E(&rerr) req := GenTransReq(30, false, false) resp, err := xa.CallBranch(req, Busi+"/TransOutXa") diff --git a/examples/quick_start.go b/examples/quick_start.go index 5bf3815..57ac94d 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -31,7 +31,7 @@ func QsStartSvr() { func QsFireRequest() string { req := &gin.H{"amount": 30} // 微服务的载荷 // DtmServer为DTM服务的地址 - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)). // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCompensate" Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCompensate"