diff --git a/.simplecov b/.simplecov new file mode 100644 index 0000000..a8c24e0 --- /dev/null +++ b/.simplecov @@ -0,0 +1,8 @@ +require 'simplecov' +require 'coveralls' + +SimpleCov.formatter = Coveralls::SimpleCov::Formatter +SimpleCov.start do + add_filter 'app' + add_filter 'examples' +end \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 8b4cb5c..83e7ddd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ branches: only: - master - main + - alpha services: - mysql before_install: diff --git a/common/utils_test.go b/common/utils_test.go index ecebe8e..96407b6 100644 --- a/common/utils_test.go +++ b/common/utils_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/gin-gonic/gin" - "github.com/go-playground/assert/v2" + "github.com/stretchr/testify/assert" ) func TestEP(t *testing.T) { @@ -83,6 +83,10 @@ func TestSome(t *testing.T) { n := MustAtoi("123") assert.Equal(t, 123, n) + err := CatchP(func() { + MustAtoi("abc") + }) + assert.Error(t, err) wd := MustGetwd() assert.NotEqual(t, "", wd) diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 8348e23..565e1db 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "net/url" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -26,18 +27,25 @@ func (t *TransInfo) String() string { return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) } -// TransInfoFromReq construct transaction info from request -func TransInfoFromReq(c *gin.Context) *TransInfo { +// MustGetTrans construct transaction info from request +func MustGetTrans(c *gin.Context) *TransInfo { + ti, err := TransInfoFromQuery(c.Request.URL.Query()) + e2p(err) + return ti +} + +// TransInfoFromQuery construct transaction info from request +func TransInfoFromQuery(qs url.Values) (*TransInfo, error) { ti := &TransInfo{ - TransType: c.Query("trans_type"), - Gid: c.Query("gid"), - BranchID: c.Query("branch_id"), - BranchType: c.Query("branch_type"), + TransType: qs.Get("trans_type"), + Gid: qs.Get("gid"), + BranchID: qs.Get("branch_id"), + BranchType: qs.Get("branch_type"), } if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" { - panic(fmt.Errorf("invlid trans info: %v", ti)) + return nil, fmt.Errorf("invlid trans info: %v", ti) } - return ti + return ti, nil } // BarrierModel barrier model for gorm @@ -63,7 +71,7 @@ func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, br if branchType == "" { return 0, nil } - res, err := logExec(tx, "insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) + res, err := logExec(tx, "insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) if err != nil { return 0, err } @@ -119,12 +127,12 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re return } if result.Valid { // 数据库里有上一次结果,返回上一次的结果 - res = json.Unmarshal([]byte(result.String), &res) - return - } else { // 数据库里没有上次的结果,属于重复空补偿,直接返回成功 - res = common.MS{"dtm_result": "SUCCESS"} + rerr = json.Unmarshal([]byte(result.String), &res) return } + // 数据库里没有上次的结果,属于重复空补偿,直接返回成功 + res = common.MS{"dtm_result": "SUCCESS"} + return } res, rerr = busiCall(db) if rerr == nil { // 正确返回了,需要将结果保存到数据库 diff --git a/dtmcli/message.go b/dtmcli/message.go index 933bbe3..f1acb03 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -3,7 +3,6 @@ package dtmcli import ( "fmt" - jsonitor "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -29,10 +28,10 @@ type MsgStep struct { } // NewMsg create new msg -func NewMsg(server string) *Msg { +func NewMsg(server string, gid string) *Msg { return &Msg{ MsgData: MsgData{ - Gid: GenGid(server), + Gid: gid, TransType: "msg", }, Server: server, @@ -54,14 +53,7 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { func (s *Msg) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("submit failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil + return CheckDtmResponse(resp, err) } // Prepare prepare the msg @@ -69,11 +61,9 @@ func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("prepare failed: %v", resp.Body()) + rerr := CheckDtmResponse(resp, err) + if rerr != nil { + return rerr } return nil } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index 0d4deb1..c92d13d 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -3,7 +3,6 @@ package dtmcli import ( "fmt" - jsonitor "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -29,10 +28,10 @@ type SagaStep struct { } // NewSaga create a saga -func NewSaga(server string) *Saga { +func NewSaga(server string, gid string) *Saga { return &Saga{ SagaData: SagaData{ - Gid: GenGid(server), + Gid: gid, TransType: "saga", }, Server: server, @@ -55,12 +54,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga func (s *Saga) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("submit failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil + return CheckDtmResponse(resp, err) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 302b66b..7f89d93 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -2,6 +2,7 @@ package dtmcli import ( "fmt" + "strings" "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" @@ -20,25 +21,31 @@ type Tcc struct { type TccGlobalFunc func(tcc *Tcc) error // TccGlobalTransaction begin a tcc global transaction -func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) { - gid = GenGid(dtm) +func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { data := &M{ "gid": gid, "trans_type": "tcc", } defer func() { + var resp *resty.Response var err error - if x := recover(); x != nil || rerr != nil { - _, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") + var x interface{} + if x = recover(); x != nil || rerr != nil { + resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") } else { - _, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") + resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") } - if err != nil { - logrus.Errorf("submitting or abort global transaction error: %v", err) + err2 := CheckDtmResponse(resp, err) + if err2 != nil { + logrus.Errorf("submitting or abort global transaction error: %v", err2) + } + if x != nil { + panic(x) } }() tcc := &Tcc{Dtm: dtm, Gid: gid} - _, rerr = common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") + resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") + rerr = CheckDtmResponse(resp, err) if rerr != nil { return } @@ -74,10 +81,11 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "cancel": cancelURL, }). Post(t.Dtm + "/registerTccBranch") + err = CheckDtmResponse(resp, err) if err != nil { return resp, err } - return common.RestyClient.R(). + resp, err = common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "dtm": t.Dtm, @@ -87,4 +95,8 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "branch_type": "try", }). Post(tryURL) + if err == nil && strings.Contains(resp.String(), "FAILURE") { + err = fmt.Errorf("branch return failure: %s", resp.String()) + } + return resp, err } diff --git a/dtmcli/types.go b/dtmcli/types.go index d6766f9..e443fca 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -2,18 +2,33 @@ package dtmcli import ( "fmt" + "strings" + "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" ) -// GenGid generate a new gid -func GenGid(server string) string { +// MustGenGid generate a new gid +func MustGenGid(server string) string { res := common.MS{} - _, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid") - e2p(err) + resp, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid") + if err != nil || res["gid"] == "" { + panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp)) + } return res["gid"] } +// CheckDtmResponse check the response of dtm, if not ok ,generate error +func CheckDtmResponse(resp *resty.Response, err error) error { + if err != nil { + return err + } + if !strings.Contains(resp.String(), "SUCCESS") { + return fmt.Errorf("dtm response failed: %s", resp.String()) + } + return nil +} + // IDGenerator used to generate a branch id type IDGenerator struct { parentID string diff --git a/dtmcli/types_test.go b/dtmcli/types_test.go new file mode 100644 index 0000000..97a3d88 --- /dev/null +++ b/dtmcli/types_test.go @@ -0,0 +1,24 @@ +package dtmcli + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" +) + +func TestTypes(t *testing.T) { + err := common.CatchP(func() { + idGen := IDGenerator{parentID: "12345678901234567890123"} + idGen.NewBranchID() + }) + assert.Error(t, err) + err = common.CatchP(func() { + idGen := IDGenerator{branchID: 99} + idGen.NewBranchID() + }) + assert.Error(t, err) + _, err = TransInfoFromQuery(url.Values{}) + assert.Error(t, err) +} diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 591c616..13f10b6 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -34,16 +35,6 @@ type Xa struct { Gid string } -// GetParams get xa params map -func (x *Xa) GetParams(branchID string) common.MS { - return common.MS{ - "gid": x.Gid, - "trans_type": "xa", - "branch_id": branchID, - "branch_type": "action", - } -} - // XaFromReq construct xa info from request func XaFromReq(c *gin.Context) *Xa { return &Xa{ @@ -52,20 +43,17 @@ func XaFromReq(c *gin.Context) *Xa { } } -// NewXaBranchID generate a xa branch id -func (x *Xa) NewXaBranchID() string { - return x.Gid + "-" + x.NewBranchID() -} - // NewXaClient construct a xa client -func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) *XaClient { +func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) (*XaClient, error) { xa := &XaClient{ Server: server, Conf: mysqlConf, CallbackURL: callbackURL, } u, err := url.Parse(callbackURL) - e2p(err) + if err != nil { + return nil, err + } app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { type CallbackReq struct { Gid string `json:"gid"` @@ -74,7 +62,9 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca } req := CallbackReq{} b, err := c.GetRawData() - e2p(err) + if err != nil { + return nil, err + } common.MustUnmarshal(b, &req) tx, my := common.DbAlone(xa.Conf) defer my.Close() @@ -88,7 +78,7 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca } return M{"dtm_result": "SUCCESS"}, nil })) - return xa + return xa, nil } // XaLocalTransaction start a xa local transaction @@ -115,9 +105,8 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r } // XaGlobalTransaction start a xa global transaction -func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { - xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)} - gid = xa.Gid +func (xc *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) error { + xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", @@ -125,29 +114,33 @@ func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rer defer func() { x := recover() if x != nil { - _, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") - rerr = x.(error) + r, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") + if !strings.Contains(r.String(), "SUCCESS") { + logrus.Errorf("abort xa error: resp: %s err: %v", r.String(), err) + } } }() - resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") - e2p(rerr) - if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) + resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") + rerr := CheckDtmResponse(resp, err) + if rerr != nil { + return rerr } rerr = transFunc(&xa) - e2p(rerr) - resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") - e2p(rerr) - if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) + if rerr != nil { + return rerr } - return + resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") + rerr = CheckDtmResponse(resp, err) + if rerr != nil { + return rerr + } + return nil } // CallBranch call a xa branch func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { branchID := x.NewBranchID() - return common.RestyClient.R(). + resp, err := common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ "gid": x.Gid, @@ -156,4 +149,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { "branch_type": "action", }). Post(url) + if strings.Contains(resp.String(), "FAILURE") { + return resp, fmt.Errorf("FAILURE result: %s err: %v", resp.String(), err) + } + return resp, err } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index e9a4d5d..0ca9511 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -21,14 +21,14 @@ func addRoute(engine *gin.Engine) { } func newGid(c *gin.Context) (interface{}, error) { - return M{"gid": GenGid()}, nil + return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil } func prepare(c *gin.Context) (interface{}, error) { t := TransFromContext(c) t.Status = "prepared" t.saveNew(dbGet()) - return M{"dtm_result": "SUCCESS", "gid": t.Gid}, nil + return M{"dtm_result": "SUCCESS"}, nil } func submit(c *gin.Context) (interface{}, error) { @@ -41,7 +41,7 @@ func submit(c *gin.Context) (interface{}, error) { t.Status = "submitted" t.saveNew(db) go t.Process(db) - return M{"dtm_result": "SUCCESS", "gid": t.Gid}, nil + return M{"dtm_result": "SUCCESS"}, nil } func abort(c *gin.Context) (interface{}, error) { diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index dfbca0a..284c44b 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -3,9 +3,7 @@ package dtmsvr import ( "database/sql" "fmt" - "strings" "testing" - "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -19,7 +17,7 @@ var DtmServer = examples.DtmServer var Busi = examples.Busi var app *gin.Engine -func init() { +func TestMain(m *testing.M) { TransProcessedTestChan = make(chan string, 1) common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = dbName @@ -40,34 +38,7 @@ func init() { e2p(dbGet().Exec("truncate trans_branch").Error) e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() -} - -func TestDtmSvr(t *testing.T) { - - tccBarrierDisorder(t) - tccBarrierNormal(t) - tccBarrierRollback(t) - sagaBarrierNormal(t) - sagaBarrierRollback(t) - msgNormal(t) - msgPending(t) - tccNormal(t) - tccRollback(t) - sagaNormal(t) - xaNormal(t) - xaRollback(t) - sagaCommittedPending(t) - sagaRollback(t) - - // for coverage - examples.QsStartSvr() - assertSucceed(t, examples.QsFireRequest()) - assertSucceed(t, examples.MsgFireRequest()) - assertSucceed(t, examples.SagaBarrierFireRequest()) - assertSucceed(t, examples.SagaFireRequest()) - assertSucceed(t, examples.TccBarrierFireRequest()) - assertSucceed(t, examples.TccFireRequest()) - assertSucceed(t, examples.XaFireRequest()) + m.Run() } func TestCover(t *testing.T) { @@ -80,6 +51,21 @@ func TestCover(t *testing.T) { go CronExpiredTrans(1) } +func TestType(t *testing.T) { + err := common.CatchP(func() { + dtmcli.MustGenGid("http://localhost:8080/api/no") + }) + assert.Error(t, err) + err = common.CatchP(func() { + resp, err := common.RestyClient.R().SetBody(common.M{ + "gid": "1", + "trans_type": "msg", + }).Get("http://localhost:8080/api/dtmsvr/abort") + common.CheckRestySuccess(resp, err) + }) + assert.Error(t, err) +} + func getTransStatus(gid string) string { sm := TransGlobal{} dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) @@ -98,177 +84,6 @@ func getBranchesStatus(gid string) []string { return status } -func xaNormal(t *testing.T) { - xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { - req := examples.GenTransReq(30, false, false) - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - common.CheckRestySuccess(resp, err) - resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") - common.CheckRestySuccess(resp, err) - return nil - }) - e2p(err) - WaitTransProcessed(gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) -} - -func xaRollback(t *testing.T) { - xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { - req := examples.GenTransReq(30, false, true) - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - common.CheckRestySuccess(resp, err) - resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") - common.CheckRestySuccess(resp, err) - return nil - }) - if err != nil { - logrus.Errorf("global transaction failed, so rollback") - } - WaitTransProcessed(gid) - assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) - assert.Equal(t, "failed", getTransStatus(gid)) -} - -func tccNormal(t *testing.T) { - data := &examples.TransReq{Amount: 30} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - return - }) - e2p(err) -} -func tccBarrierNormal(t *testing.T) { - _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return - }) - e2p(err) -} - -func tccBarrierRollback(t *testing.T) { - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - if strings.Contains(res2.String(), "FAILURE") { - return fmt.Errorf("branch trans in fail") - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return - }) - assert.Equal(t, err, fmt.Errorf("branch trans in fail")) - WaitTransProcessed(gid) - assert.Equal(t, "failed", getTransStatus(gid)) -} - -func tccRollback(t *testing.T) { - data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - return - }) - e2p(err) -} -func msgNormal(t *testing.T) { - msg := genMsg("gid-msg-normal") - msg.Submit() - assert.Equal(t, "submitted", getTransStatus(msg.Gid)) - WaitTransProcessed(msg.Gid) - assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) - assert.Equal(t, "succeed", getTransStatus(msg.Gid)) -} - -func msgPending(t *testing.T) { - msg := genMsg("gid-msg-normal-pending") - msg.Prepare("") - assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") - CronTransOnce(60 * time.Second) - assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MainSwitch.TransInResult.SetOnce("PENDING") - CronTransOnce(60 * time.Second) - assert.Equal(t, "submitted", getTransStatus(msg.Gid)) - CronTransOnce(60 * time.Second) - assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) - assert.Equal(t, "succeed", getTransStatus(msg.Gid)) -} - -func sagaNormal(t *testing.T) { - saga := genSaga("gid-noramlSaga", false, false) - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) - assert.Equal(t, "succeed", getTransStatus(saga.Gid)) - transQuery(t, saga.Gid) -} - -func sagaBarrierNormal(t *testing.T) { - req := &examples.TransReq{Amount: 30} - saga := dtmcli.NewSaga(DtmServer). - Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). - Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - logrus.Printf("busi trans submit") - err := saga.Submit() - e2p(err) - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) -} - -func sagaRollback(t *testing.T) { - saga := genSaga("gid-rollbackSaga2", false, true) - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, "failed", getTransStatus(saga.Gid)) - assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) -} - -func sagaBarrierRollback(t *testing.T) { - saga := dtmcli.NewSaga(DtmServer). - Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). - Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) - logrus.Printf("busi trans submit") - err := saga.Submit() - e2p(err) - WaitTransProcessed(saga.Gid) - assert.Equal(t, "failed", getTransStatus(saga.Gid)) -} - -func sagaCommittedPending(t *testing.T) { - saga := genSaga("gid-committedPending", false, false) - examples.MainSwitch.TransInResult.SetOnce("PENDING") - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) - CronTransOnce(60 * time.Second) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) - assert.Equal(t, "succeed", getTransStatus(saga.Gid)) -} - func assertSucceed(t *testing.T, gid string) { WaitTransProcessed(gid) assert.Equal(t, "succeed", getTransStatus(gid)) @@ -276,22 +91,20 @@ func assertSucceed(t *testing.T, gid string) { func genMsg(gid string) *dtmcli.Msg { logrus.Printf("beginning a msg test ---------------- %s", gid) - msg := dtmcli.NewMsg(examples.DtmServer) + msg := dtmcli.NewMsg(examples.DtmServer, gid) msg.QueryPrepared = examples.Busi + "/CanSubmit" req := examples.GenTransReq(30, false, false) msg.Add(examples.Busi+"/TransOut", &req) msg.Add(examples.Busi+"/TransIn", &req) - msg.Gid = gid return msg } func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtmcli.NewSaga(examples.DtmServer) + saga := dtmcli.NewSaga(examples.DtmServer, gid) req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) - saga.Gid = gid return saga } @@ -335,79 +148,17 @@ func TestSqlDB(t *testing.T) { asserts.Equal(dbr.RowsAffected, int64(1)) dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(0)) + gid2Res := common.M{"result": "first"} _, err = dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) { logrus.Printf("submit gid2") - return nil, nil + return gid2Res, nil }) asserts.Nil(err) dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(1)) -} - -func tccBarrierDisorder(t *testing.T) { - timeoutChan := make(chan string, 2) - finishedChan := make(chan string, 2) - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - body := &examples.TransReq{Amount: 30} - tryURL := Busi + "/TccBTransOutTry" - confirmURL := Busi + "/TccBTransOutConfirm" - cancelURL := Busi + "/TccBSleepCancel" - // 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch - branchID := tcc.NewBranchID() - sleeped := false - app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - res, err := examples.TccBarrierTransOutCancel(c) - if !sleeped { - sleeped = true - logrus.Printf("sleep before cancel return") - <-timeoutChan - finishedChan <- "1" - } - return res, err - })) - // 注册子事务 - r, err := common.RestyClient.R(). - SetBody(&M{ - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(tcc.Dtm + "/registerTccBranch") - e2p(err) - assert.True(t, strings.Contains(r.String(), "SUCCESS")) - go func() { - logrus.Printf("sleeping to wait for tcc try timeout") - <-timeoutChan - _, _ = common.RestyClient.R(). - SetBody(body). - SetQueryParams(common.MS{ - "dtm": tcc.Dtm, - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "branch_type": "try", - }). - Post(tryURL) - finishedChan <- "1" - }() - logrus.Printf("cron to timeout and then call cancel") - go CronTransOnce(60 * time.Second) - time.Sleep(100 * time.Millisecond) - logrus.Printf("cron to timeout and then call cancelled twice") - CronTransOnce(60 * time.Second) - timeoutChan <- "wake" - timeoutChan <- "wake" - <-finishedChan - <-finishedChan - time.Sleep(100 * time.Millisecond) - return fmt.Errorf("a cancelled tcc") + newResult, err := dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) { + logrus.Printf("submit gid2") + return common.MS{"result": "ignored"}, nil }) - assert.Error(t, err, fmt.Errorf("a cancelled tcc")) - assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid)) - assert.Equal(t, "failed", getTransStatus(gid)) + asserts.Equal(newResult, gid2Res) } diff --git a/dtmsvr/examples_test.go b/dtmsvr/examples_test.go new file mode 100644 index 0000000..53990a7 --- /dev/null +++ b/dtmsvr/examples_test.go @@ -0,0 +1,19 @@ +package dtmsvr + +import ( + "testing" + + "github.com/yedf/dtm/examples" +) + +func TestExamples(t *testing.T) { + // for coverage + examples.QsStartSvr() + assertSucceed(t, examples.QsFireRequest()) + assertSucceed(t, examples.MsgFireRequest()) + assertSucceed(t, examples.SagaBarrierFireRequest()) + assertSucceed(t, examples.SagaFireRequest()) + assertSucceed(t, examples.TccBarrierFireRequest()) + assertSucceed(t, examples.TccFireRequest()) + assertSucceed(t, examples.XaFireRequest()) +} diff --git a/dtmsvr/trans_msg_test.go b/dtmsvr/trans_msg_test.go new file mode 100644 index 0000000..fd0c1bb --- /dev/null +++ b/dtmsvr/trans_msg_test.go @@ -0,0 +1,39 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/examples" +) + +func TestMsg(t *testing.T) { + + msgNormal(t) + msgPending(t) +} + +func msgNormal(t *testing.T) { + msg := genMsg("gid-msg-normal") + msg.Submit() + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + WaitTransProcessed(msg.Gid) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + +func msgPending(t *testing.T) { + msg := genMsg("gid-msg-normal-pending") + msg.Prepare("") + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MainSwitch.TransInResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} diff --git a/dtmsvr/trans_saga_barrier_test.go b/dtmsvr/trans_saga_barrier_test.go new file mode 100644 index 0000000..1749c4e --- /dev/null +++ b/dtmsvr/trans_saga_barrier_test.go @@ -0,0 +1,39 @@ +package dtmsvr + +import ( + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestSagaBarrier(t *testing.T) { + + sagaBarrierNormal(t) + sagaBarrierRollback(t) +} + +func sagaBarrierNormal(t *testing.T) { + req := &examples.TransReq{Amount: 30} + saga := dtmcli.NewSaga(DtmServer, "sagaBarrierNormal"). + Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) + logrus.Printf("busi trans submit") + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) +} + +func sagaBarrierRollback(t *testing.T) { + saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback"). + Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) + logrus.Printf("busi trans submit") + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) +} diff --git a/dtmsvr/trans_saga_test.go b/dtmsvr/trans_saga_test.go new file mode 100644 index 0000000..0868820 --- /dev/null +++ b/dtmsvr/trans_saga_test.go @@ -0,0 +1,44 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/examples" +) + +func TestSaga(t *testing.T) { + + sagaNormal(t) + sagaCommittedPending(t) + sagaRollback(t) +} + +func sagaNormal(t *testing.T) { + saga := genSaga("gid-noramlSaga", false, false) + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) + transQuery(t, saga.Gid) +} + +func sagaCommittedPending(t *testing.T) { + saga := genSaga("gid-committedPending", false, false) + examples.MainSwitch.TransInResult.SetOnce("PENDING") + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) +} + +func sagaRollback(t *testing.T) { + saga := genSaga("gid-rollbackSaga2", false, true) + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) + assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) +} diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go new file mode 100644 index 0000000..8d1a3bd --- /dev/null +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -0,0 +1,127 @@ +package dtmsvr + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestTccBarrier(t *testing.T) { + tccBarrierDisorder(t) + tccBarrierNormal(t) + tccBarrierRollback(t) + +} + +func tccBarrierRollback(t *testing.T) { + gid := "tccBarrierRollback" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + assert.Contains(t, res1.String(), "SUCCESS") + _, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + assert.Error(t, rerr) + return + }) + assert.Error(t, err) + WaitTransProcessed(gid) + assert.Equal(t, "failed", getTransStatus(gid)) +} + +func tccBarrierNormal(t *testing.T) { + gid := "tccBarrierNormal" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + e2p(rerr) + if res1.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res1.StatusCode()) + } + res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + e2p(rerr) + if res2.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res2.StatusCode()) + } + logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) + return + }) + e2p(err) + WaitTransProcessed(gid) + assert.Equal(t, "succeed", getTransStatus(gid)) +} + +func tccBarrierDisorder(t *testing.T) { + timeoutChan := make(chan string, 2) + finishedChan := make(chan string, 2) + gid := "tccBarrierDisorder" + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + body := &examples.TransReq{Amount: 30} + tryURL := Busi + "/TccBTransOutTry" + confirmURL := Busi + "/TccBTransOutConfirm" + cancelURL := Busi + "/TccBSleepCancel" + // 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch + branchID := tcc.NewBranchID() + sleeped := false + app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + res, err := examples.TccBarrierTransOutCancel(c) + if !sleeped { + sleeped = true + logrus.Printf("sleep before cancel return") + <-timeoutChan + finishedChan <- "1" + } + return res, err + })) + // 注册子事务 + r, err := common.RestyClient.R(). + SetBody(&M{ + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }). + Post(tcc.Dtm + "/registerTccBranch") + e2p(err) + assert.True(t, strings.Contains(r.String(), "SUCCESS")) + go func() { + logrus.Printf("sleeping to wait for tcc try timeout") + <-timeoutChan + r, _ = common.RestyClient.R(). + SetBody(body). + SetQueryParams(common.MS{ + "dtm": tcc.Dtm, + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "branch_type": "try", + }). + Post(tryURL) + assert.True(t, strings.Contains(r.String(), "FAILURE")) + finishedChan <- "1" + }() + logrus.Printf("cron to timeout and then call cancel") + go CronTransOnce(60 * time.Second) + time.Sleep(100 * time.Millisecond) + logrus.Printf("cron to timeout and then call cancelled twice") + CronTransOnce(60 * time.Second) + timeoutChan <- "wake" + timeoutChan <- "wake" + <-finishedChan + <-finishedChan + time.Sleep(100 * time.Millisecond) + return fmt.Errorf("a cancelled tcc") + }) + assert.Error(t, err, fmt.Errorf("a cancelled tcc")) + assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, "failed", getTransStatus(gid)) +} diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go new file mode 100644 index 0000000..741026c --- /dev/null +++ b/dtmsvr/trans_tcc_test.go @@ -0,0 +1,41 @@ +package dtmsvr + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestTcc(t *testing.T) { + tccNormal(t) + tccRollback(t) + +} + +func tccNormal(t *testing.T) { + data := &examples.TransReq{Amount: 30} + gid := "tccNormal" + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + e2p(rerr) + _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + e2p(rerr) + return + }) + e2p(err) +} + +func tccRollback(t *testing.T) { + gid := "tccRollback" + data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} + err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + assert.Contains(t, resp.String(), "SUCCESS") + _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + assert.Error(t, rerr) + return + }) + assert.Error(t, err) +} diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go new file mode 100644 index 0000000..851f9c6 --- /dev/null +++ b/dtmsvr/trans_xa_test.go @@ -0,0 +1,59 @@ +package dtmsvr + +import ( + "fmt" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestXa(t *testing.T) { + xaLocalError(t) + xaNormal(t) + xaRollback(t) +} + +func xaLocalError(t *testing.T) { + err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) error { + return fmt.Errorf("an error") + }) + assert.Error(t, err, fmt.Errorf("an error")) +} +func xaNormal(t *testing.T) { + xc := examples.XaClient + gid := "xaNormal" + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error { + req := examples.GenTransReq(30, false, false) + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") + common.CheckRestySuccess(resp, err) + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") + common.CheckRestySuccess(resp, err) + return nil + }) + e2p(err) + WaitTransProcessed(gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) +} + +func xaRollback(t *testing.T) { + xc := examples.XaClient + gid := "xaRollback" + err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error { + req := examples.GenTransReq(30, false, true) + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") + common.CheckRestySuccess(resp, err) + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") + common.CheckRestySuccess(resp, err) + return nil + }) + if err != nil { + logrus.Errorf("global transaction failed, so rollback") + } + WaitTransProcessed(gid) + assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, "failed", getTransStatus(gid)) +} diff --git a/examples/main_msg.go b/examples/main_msg.go index ae635b3..81c2ee5 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -18,7 +18,7 @@ func MsgFireRequest() string { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - msg := dtmcli.NewMsg(DtmServer). + msg := dtmcli.NewMsg(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/TransQuery") diff --git a/examples/main_saga.go b/examples/main_saga.go index b6fc8d8..36c349e 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -18,7 +18,7 @@ func SagaFireRequest() string { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) logrus.Printf("saga busi trans submit") diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 90e9725..8fd8e0e 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -14,7 +14,7 @@ import ( func SagaBarrierFireRequest() string { logrus.Printf("a busi transaction begin") req := &TransReq{Amount: 30} - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) logrus.Printf("busi trans submit") @@ -44,13 +44,13 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 1, req.Amount) }) } func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount) }) } @@ -60,13 +60,13 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 2, -req.Amount) }) } func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount) }) } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index f13368f..fa2e310 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -11,16 +11,11 @@ import ( func TccSetup(app *gin.Engine) { app.POST(BusiAPI+"/TransInTcc", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromReq(c) - if err != nil { - return nil, err - } + e2p(err) req := reqFrom(c) logrus.Printf("Trans in %d here, and Trans in another %d in call2 ", req.Amount/2, req.Amount/2) _, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - if rerr != nil { - return nil, rerr - } - + e2p(rerr) return M{"dtm_result": "SUCCESS"}, nil })) @@ -29,15 +24,12 @@ func TccSetup(app *gin.Engine) { // TccFireRequest 1 func TccFireRequest() string { logrus.Printf("tcc transaction begin") - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := dtmcli.MustGenGid(DtmServer) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - if rerr != nil { - return - } + e2p(rerr) res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTcc", Busi+"/TransInConfirm", Busi+"/TransInRevert") - if rerr != nil { - return - } + e2p(rerr) logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) return }) diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index 22df1d5..189d9b8 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -13,21 +13,12 @@ import ( // TccBarrierFireRequest 1 func TccBarrierFireRequest() string { logrus.Printf("tcc transaction begin") - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + gid := dtmcli.MustGenGid(DtmServer) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - if rerr != nil { - return - } - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } + common.CheckRestySuccess(res1, rerr) res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - if rerr != nil { - return - } - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } + common.CheckRestySuccess(res1, rerr) logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) return }) @@ -79,19 +70,19 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustTrading(sdb, transInUID, req.Amount) }) } func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustBalance(sdb, transInUID, reqFrom(c).Amount) }) } func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustTrading(sdb, transInUID, -reqFrom(c).Amount) }) } @@ -101,19 +92,20 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustTrading(sdb, transOutUID, -req.Amount) }) } func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount) }) } +// TccBarrierTransOutCancel will be use in test func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { - return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) { return adjustTrading(sdb, transOutUID, reqFrom(c).Amount) }) } diff --git a/examples/main_xa.go b/examples/main_xa.go index 30e62a6..b55450b 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -2,6 +2,7 @@ package examples import ( "fmt" + "strings" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -38,7 +39,8 @@ func dbGet() *common.DB { // XaFireRequest 1 func XaFireRequest() string { - gid, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) { + gid := dtmcli.MustGenGid(DtmServer) + err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (rerr error) { defer common.P2E(&rerr) req := GenTransReq(30, false, false) resp, err := xa.CallBranch(req, Busi+"/TransOutXa") @@ -56,18 +58,23 @@ func XaSetup(app *gin.Engine) { app.POST(BusiAPI+"/TransInXa", common.WrapHandler(xaTransIn)) app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(xaTransOut)) config.Mysql["database"] = "dtm_busi" - XaClient = dtmcli.NewXaClient(DtmServer, config.Mysql, app, Busi+"/xa") + var err error + XaClient, err = dtmcli.NewXaClient(DtmServer, config.Mysql, app, Busi+"/xa") + e2p(err) } func xaTransIn(c *gin.Context) (interface{}, error) { err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) { req := reqFrom(c) if req.TransInResult != "SUCCESS" { - return fmt.Errorf("tranIn failed") + return fmt.Errorf("tranIn FAILURE") } dbr := db.Exec("update user_account set balance=balance+? where user_id=?", req.Amount, 2) return dbr.Error }) + if err != nil && strings.Contains(err.Error(), "FAILURE") { + return M{"dtm_result": "FAILURE"}, nil + } e2p(err) return M{"dtm_result": "SUCCESS"}, nil } diff --git a/examples/quick_start.go b/examples/quick_start.go index 5bf3815..a2f268a 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -31,7 +31,7 @@ func QsStartSvr() { func QsFireRequest() string { req := &gin.H{"amount": 30} // 微服务的载荷 // DtmServer为DTM服务的地址 - saga := dtmcli.NewSaga(DtmServer). + saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransOutCompensate" Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransOut", 逆向操作为url: qsBusi+"/TransInCompensate"