From 3aa70932d3d180091ec51bade113a519cfec6739 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Wed, 21 Jul 2021 20:10:48 +0800 Subject: [PATCH] split the big test file --- .travis.yml | 1 + dtmsvr/dtmsvr_test.go | 273 +----------------------------- dtmsvr/examples_test.go | 19 +++ dtmsvr/trans_msg_test.go | 39 +++++ dtmsvr/trans_saga_barrier_test.go | 39 +++++ dtmsvr/trans_saga_test.go | 44 +++++ dtmsvr/trans_tcc_barrier_test.go | 132 +++++++++++++++ dtmsvr/trans_tcc_test.go | 38 +++++ dtmsvr/trans_xa_test.go | 50 ++++++ 9 files changed, 364 insertions(+), 271 deletions(-) create mode 100644 dtmsvr/examples_test.go create mode 100644 dtmsvr/trans_msg_test.go create mode 100644 dtmsvr/trans_saga_barrier_test.go create mode 100644 dtmsvr/trans_saga_test.go create mode 100644 dtmsvr/trans_tcc_barrier_test.go create mode 100644 dtmsvr/trans_tcc_test.go create mode 100644 dtmsvr/trans_xa_test.go diff --git a/.travis.yml b/.travis.yml index 8b4cb5c..944887b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ branches: only: - master - main + - test services: - mysql before_install: diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index ecb3a38..2ca0329 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -3,9 +3,7 @@ package dtmsvr import ( "database/sql" "fmt" - "strings" "testing" - "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -19,7 +17,7 @@ var DtmServer = examples.DtmServer var Busi = examples.Busi var app *gin.Engine -func init() { +func TestMain(m *testing.M) { TransProcessedTestChan = make(chan string, 1) common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = dbName @@ -40,34 +38,7 @@ func init() { e2p(dbGet().Exec("truncate trans_branch").Error) e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() -} - -func TestDtmSvr(t *testing.T) { - - tccBarrierDisorder(t) - tccBarrierNormal(t) - tccBarrierRollback(t) - sagaBarrierNormal(t) - sagaBarrierRollback(t) - msgNormal(t) - msgPending(t) - tccNormal(t) - tccRollback(t) - sagaNormal(t) - xaNormal(t) - xaRollback(t) - sagaCommittedPending(t) - sagaRollback(t) - - // for coverage - examples.QsStartSvr() - assertSucceed(t, examples.QsFireRequest()) - assertSucceed(t, examples.MsgFireRequest()) - assertSucceed(t, examples.SagaBarrierFireRequest()) - assertSucceed(t, examples.SagaFireRequest()) - assertSucceed(t, examples.TccBarrierFireRequest()) - assertSucceed(t, examples.TccFireRequest()) - assertSucceed(t, examples.XaFireRequest()) + m.Run() } func TestCover(t *testing.T) { @@ -98,177 +69,6 @@ func getBranchesStatus(gid string) []string { return status } -func xaNormal(t *testing.T) { - xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { - req := examples.GenTransReq(30, false, false) - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - common.CheckRestySuccess(resp, err) - resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") - common.CheckRestySuccess(resp, err) - return nil - }) - e2p(err) - WaitTransProcessed(gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) -} - -func xaRollback(t *testing.T) { - xc := examples.XaClient - gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { - req := examples.GenTransReq(30, false, true) - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - common.CheckRestySuccess(resp, err) - resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") - common.CheckRestySuccess(resp, err) - return nil - }) - if err != nil { - logrus.Errorf("global transaction failed, so rollback") - } - WaitTransProcessed(gid) - assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) - assert.Equal(t, "failed", getTransStatus(gid)) -} - -func tccNormal(t *testing.T) { - data := &examples.TransReq{Amount: 30} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - return - }) - e2p(err) -} -func tccBarrierNormal(t *testing.T) { - _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return - }) - e2p(err) -} - -func tccBarrierRollback(t *testing.T) { - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") - e2p(rerr) - if res1.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res1.StatusCode()) - } - res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") - e2p(rerr) - if res2.StatusCode() != 200 { - return fmt.Errorf("bad status code: %d", res2.StatusCode()) - } - if strings.Contains(res2.String(), "FAILURE") { - return fmt.Errorf("branch trans in fail") - } - logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) - return - }) - assert.Equal(t, err, fmt.Errorf("branch trans in fail")) - WaitTransProcessed(gid) - assert.Equal(t, "failed", getTransStatus(gid)) -} - -func tccRollback(t *testing.T) { - data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") - e2p(rerr) - _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") - e2p(rerr) - return - }) - e2p(err) -} -func msgNormal(t *testing.T) { - msg := genMsg("gid-msg-normal") - msg.Submit() - assert.Equal(t, "submitted", getTransStatus(msg.Gid)) - WaitTransProcessed(msg.Gid) - assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) - assert.Equal(t, "succeed", getTransStatus(msg.Gid)) -} - -func msgPending(t *testing.T) { - msg := genMsg("gid-msg-normal-pending") - msg.Prepare("") - assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") - CronTransOnce(60 * time.Second) - assert.Equal(t, "prepared", getTransStatus(msg.Gid)) - examples.MainSwitch.TransInResult.SetOnce("PENDING") - CronTransOnce(60 * time.Second) - assert.Equal(t, "submitted", getTransStatus(msg.Gid)) - CronTransOnce(60 * time.Second) - assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) - assert.Equal(t, "succeed", getTransStatus(msg.Gid)) -} - -func sagaNormal(t *testing.T) { - saga := genSaga("gid-noramlSaga", false, false) - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) - assert.Equal(t, "succeed", getTransStatus(saga.Gid)) - transQuery(t, saga.Gid) -} - -func sagaBarrierNormal(t *testing.T) { - req := &examples.TransReq{Amount: 30} - saga := dtmcli.NewSaga(DtmServer). - Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). - Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) - logrus.Printf("busi trans submit") - err := saga.Submit() - e2p(err) - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) -} - -func sagaRollback(t *testing.T) { - saga := genSaga("gid-rollbackSaga2", false, true) - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, "failed", getTransStatus(saga.Gid)) - assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) -} - -func sagaBarrierRollback(t *testing.T) { - saga := dtmcli.NewSaga(DtmServer). - Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). - Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) - logrus.Printf("busi trans submit") - err := saga.Submit() - e2p(err) - WaitTransProcessed(saga.Gid) - assert.Equal(t, "failed", getTransStatus(saga.Gid)) -} - -func sagaCommittedPending(t *testing.T) { - saga := genSaga("gid-committedPending", false, false) - examples.MainSwitch.TransInResult.SetOnce("PENDING") - saga.Submit() - WaitTransProcessed(saga.Gid) - assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) - CronTransOnce(60 * time.Second) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) - assert.Equal(t, "succeed", getTransStatus(saga.Gid)) -} - func assertSucceed(t *testing.T, gid string) { WaitTransProcessed(gid) assert.Equal(t, "succeed", getTransStatus(gid)) @@ -343,72 +143,3 @@ func TestSqlDB(t *testing.T) { dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(1)) } - -func tccBarrierDisorder(t *testing.T) { - timeoutChan := make(chan string, 2) - finishedChan := make(chan string, 2) - gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - body := &examples.TransReq{Amount: 30} - tryURL := Busi + "/TccBTransOutTry" - confirmURL := Busi + "/TccBTransOutConfirm" - cancelURL := Busi + "/TccBSleepCancel" - // 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch - branchID := tcc.NewBranchID() - sleeped := false - app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) { - res, err := examples.TccBarrierTransOutCancel(c) - if !sleeped { - sleeped = true - logrus.Printf("sleep before cancel return") - <-timeoutChan - finishedChan <- "1" - } - return res, err - })) - // 注册子事务 - r, err := common.RestyClient.R(). - SetBody(&M{ - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "status": "prepared", - "data": string(common.MustMarshal(body)), - "try": tryURL, - "confirm": confirmURL, - "cancel": cancelURL, - }). - Post(tcc.Dtm + "/registerTccBranch") - e2p(err) - assert.True(t, strings.Contains(r.String(), "SUCCESS")) - go func() { - logrus.Printf("sleeping to wait for tcc try timeout") - <-timeoutChan - r, _ = common.RestyClient.R(). - SetBody(body). - SetQueryParams(common.MS{ - "dtm": tcc.Dtm, - "gid": tcc.Gid, - "branch_id": branchID, - "trans_type": "tcc", - "branch_type": "try", - }). - Post(tryURL) - assert.True(t, strings.Contains(r.String(), "FAILURE")) - finishedChan <- "1" - }() - logrus.Printf("cron to timeout and then call cancel") - go CronTransOnce(60 * time.Second) - time.Sleep(100 * time.Millisecond) - logrus.Printf("cron to timeout and then call cancelled twice") - CronTransOnce(60 * time.Second) - timeoutChan <- "wake" - timeoutChan <- "wake" - <-finishedChan - <-finishedChan - time.Sleep(100 * time.Millisecond) - return fmt.Errorf("a cancelled tcc") - }) - assert.Error(t, err, fmt.Errorf("a cancelled tcc")) - assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid)) - assert.Equal(t, "failed", getTransStatus(gid)) -} diff --git a/dtmsvr/examples_test.go b/dtmsvr/examples_test.go new file mode 100644 index 0000000..53990a7 --- /dev/null +++ b/dtmsvr/examples_test.go @@ -0,0 +1,19 @@ +package dtmsvr + +import ( + "testing" + + "github.com/yedf/dtm/examples" +) + +func TestExamples(t *testing.T) { + // for coverage + examples.QsStartSvr() + assertSucceed(t, examples.QsFireRequest()) + assertSucceed(t, examples.MsgFireRequest()) + assertSucceed(t, examples.SagaBarrierFireRequest()) + assertSucceed(t, examples.SagaFireRequest()) + assertSucceed(t, examples.TccBarrierFireRequest()) + assertSucceed(t, examples.TccFireRequest()) + assertSucceed(t, examples.XaFireRequest()) +} diff --git a/dtmsvr/trans_msg_test.go b/dtmsvr/trans_msg_test.go new file mode 100644 index 0000000..fd0c1bb --- /dev/null +++ b/dtmsvr/trans_msg_test.go @@ -0,0 +1,39 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/examples" +) + +func TestMsg(t *testing.T) { + + msgNormal(t) + msgPending(t) +} + +func msgNormal(t *testing.T) { + msg := genMsg("gid-msg-normal") + msg.Submit() + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + WaitTransProcessed(msg.Gid) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + +func msgPending(t *testing.T) { + msg := genMsg("gid-msg-normal-pending") + msg.Prepare("") + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MainSwitch.TransInResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} diff --git a/dtmsvr/trans_saga_barrier_test.go b/dtmsvr/trans_saga_barrier_test.go new file mode 100644 index 0000000..3239136 --- /dev/null +++ b/dtmsvr/trans_saga_barrier_test.go @@ -0,0 +1,39 @@ +package dtmsvr + +import ( + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestSagaBarrier(t *testing.T) { + + sagaBarrierNormal(t) + sagaBarrierRollback(t) +} + +func sagaBarrierNormal(t *testing.T) { + req := &examples.TransReq{Amount: 30} + saga := dtmcli.NewSaga(DtmServer). + Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) + logrus.Printf("busi trans submit") + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) +} + +func sagaBarrierRollback(t *testing.T) { + saga := dtmcli.NewSaga(DtmServer). + Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}). + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) + logrus.Printf("busi trans submit") + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) +} diff --git a/dtmsvr/trans_saga_test.go b/dtmsvr/trans_saga_test.go new file mode 100644 index 0000000..0868820 --- /dev/null +++ b/dtmsvr/trans_saga_test.go @@ -0,0 +1,44 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/examples" +) + +func TestSaga(t *testing.T) { + + sagaNormal(t) + sagaCommittedPending(t) + sagaRollback(t) +} + +func sagaNormal(t *testing.T) { + saga := genSaga("gid-noramlSaga", false, false) + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) + transQuery(t, saga.Gid) +} + +func sagaCommittedPending(t *testing.T) { + saga := genSaga("gid-committedPending", false, false) + examples.MainSwitch.TransInResult.SetOnce("PENDING") + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) +} + +func sagaRollback(t *testing.T) { + saga := genSaga("gid-rollbackSaga2", false, true) + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) + assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) +} diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go new file mode 100644 index 0000000..f4b12ac --- /dev/null +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -0,0 +1,132 @@ +package dtmsvr + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestTccBarrier(t *testing.T) { + tccBarrierDisorder(t) + tccBarrierNormal(t) + tccBarrierRollback(t) + +} + +func tccBarrierRollback(t *testing.T) { + gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + e2p(rerr) + if res1.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res1.StatusCode()) + } + res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + e2p(rerr) + if res2.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res2.StatusCode()) + } + if strings.Contains(res2.String(), "FAILURE") { + return fmt.Errorf("branch trans in fail") + } + logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) + return + }) + assert.Equal(t, err, fmt.Errorf("branch trans in fail")) + WaitTransProcessed(gid) + assert.Equal(t, "failed", getTransStatus(gid)) +} + +func tccBarrierNormal(t *testing.T) { + _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") + e2p(rerr) + if res1.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res1.StatusCode()) + } + res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") + e2p(rerr) + if res2.StatusCode() != 200 { + return fmt.Errorf("bad status code: %d", res2.StatusCode()) + } + logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) + return + }) + e2p(err) +} + +func tccBarrierDisorder(t *testing.T) { + timeoutChan := make(chan string, 2) + finishedChan := make(chan string, 2) + gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + body := &examples.TransReq{Amount: 30} + tryURL := Busi + "/TccBTransOutTry" + confirmURL := Busi + "/TccBTransOutConfirm" + cancelURL := Busi + "/TccBSleepCancel" + // 请参见子事务屏障里的时序图,这里为了模拟该时序图,手动拆解了callbranch + branchID := tcc.NewBranchID() + sleeped := false + app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) { + res, err := examples.TccBarrierTransOutCancel(c) + if !sleeped { + sleeped = true + logrus.Printf("sleep before cancel return") + <-timeoutChan + finishedChan <- "1" + } + return res, err + })) + // 注册子事务 + r, err := common.RestyClient.R(). + SetBody(&M{ + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "status": "prepared", + "data": string(common.MustMarshal(body)), + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, + }). + Post(tcc.Dtm + "/registerTccBranch") + e2p(err) + assert.True(t, strings.Contains(r.String(), "SUCCESS")) + go func() { + logrus.Printf("sleeping to wait for tcc try timeout") + <-timeoutChan + r, _ = common.RestyClient.R(). + SetBody(body). + SetQueryParams(common.MS{ + "dtm": tcc.Dtm, + "gid": tcc.Gid, + "branch_id": branchID, + "trans_type": "tcc", + "branch_type": "try", + }). + Post(tryURL) + assert.True(t, strings.Contains(r.String(), "FAILURE")) + finishedChan <- "1" + }() + logrus.Printf("cron to timeout and then call cancel") + go CronTransOnce(60 * time.Second) + time.Sleep(100 * time.Millisecond) + logrus.Printf("cron to timeout and then call cancelled twice") + CronTransOnce(60 * time.Second) + timeoutChan <- "wake" + timeoutChan <- "wake" + <-finishedChan + <-finishedChan + time.Sleep(100 * time.Millisecond) + return fmt.Errorf("a cancelled tcc") + }) + assert.Error(t, err, fmt.Errorf("a cancelled tcc")) + assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, "failed", getTransStatus(gid)) +} diff --git a/dtmsvr/trans_tcc_test.go b/dtmsvr/trans_tcc_test.go new file mode 100644 index 0000000..18eff79 --- /dev/null +++ b/dtmsvr/trans_tcc_test.go @@ -0,0 +1,38 @@ +package dtmsvr + +import ( + "testing" + + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestTcc(t *testing.T) { + tccNormal(t) + tccRollback(t) + +} + +func tccNormal(t *testing.T) { + data := &examples.TransReq{Amount: 30} + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + e2p(rerr) + _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + e2p(rerr) + return + }) + e2p(err) +} + +func tccRollback(t *testing.T) { + data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} + _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { + _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + e2p(rerr) + _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + e2p(rerr) + return + }) + e2p(err) +} diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go new file mode 100644 index 0000000..f3b5636 --- /dev/null +++ b/dtmsvr/trans_xa_test.go @@ -0,0 +1,50 @@ +package dtmsvr + +import ( + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/examples" +) + +func TestXa(t *testing.T) { + + xaNormal(t) + xaRollback(t) +} + +func xaNormal(t *testing.T) { + xc := examples.XaClient + gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { + req := examples.GenTransReq(30, false, false) + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") + common.CheckRestySuccess(resp, err) + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") + common.CheckRestySuccess(resp, err) + return nil + }) + e2p(err) + WaitTransProcessed(gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) +} + +func xaRollback(t *testing.T) { + xc := examples.XaClient + gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { + req := examples.GenTransReq(30, false, true) + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") + common.CheckRestySuccess(resp, err) + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") + common.CheckRestySuccess(resp, err) + return nil + }) + if err != nil { + logrus.Errorf("global transaction failed, so rollback") + } + WaitTransProcessed(gid) + assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) + assert.Equal(t, "failed", getTransStatus(gid)) +}