From 60d8272cd471a4e34b48a9791673293ed5947242 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Wed, 19 May 2021 11:29:38 +0800 Subject: [PATCH] add prepare cancel case --- dtm_test.go | 103 +++++++++++++++++++++++++++++++-------------- dtmsvr/consumer.go | 27 +++++++----- dtmsvr/objects.go | 2 +- examples/cli.go | 2 +- examples/saga.go | 13 +++--- 5 files changed, 97 insertions(+), 50 deletions(-) diff --git a/dtm_test.go b/dtm_test.go index 62e8c17..bcee0f5 100644 --- a/dtm_test.go +++ b/dtm_test.go @@ -21,22 +21,84 @@ func TestViper(t *testing.T) { assert.Equal(t, "test_val", viper.GetString("test")) } +var myinit int = func() int { + dtmsvr.LoadConfig() + return 0 +}() + +// 测试使用的全局对象 +var rabbit = dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) +var queprepared = rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared) +var quecommited = rabbit.QueueNew(dtmsvr.RabbitmqConstCommited) +var db = dtmsvr.DbGet() + +func getSagaModel(gid string) *dtmsvr.SagaModel { + sm := dtmsvr.SagaModel{} + dbr := db.Model(&sm).Where("gid=?", gid).First(&sm) + common.PanicIfError(dbr.Error) + return &sm +} + +func getSagaStepStatus(gid string) []string { + steps := []dtmsvr.SagaStepModel{} + dbr := db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", gid).Find(&steps) + common.PanicIfError(dbr.Error) + status := []string{} + for _, step := range steps { + status = append(status, step.Status) + } + return status +} + +func noramlSaga(t *testing.T) { + saga := genSaga("gid-normal", false, false) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) + saga.Commit() + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) + assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) +} + +func rollbackSaga2(t *testing.T) { + saga := genSaga("gid-rollback2", false, true) + saga.Commit() + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status) + assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid)) +} + +func prepareCancel(t *testing.T) { + saga := genSaga("gid1-trans-cancel", false, true) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + dtmsvr.CronPreparedOne(-1 * time.Second) + assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) +} + +func preparePending(t *testing.T) { + saga := genSaga("gid1-trans-pending", false, true) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + dtmsvr.CronPreparedOne(-1 * time.Second) + assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) +} + func TestDtmSvr(t *testing.T) { // 清理数据 - rabbit := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) - queprepared := rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared) for i := 0; i < queprepared.Queue.Messages; i++ { queprepared.WaitAndHandleOne(func(data M) { logrus.Printf("ignoring prepared queue data before test") }) } - quecommited := rabbit.QueueNew(dtmsvr.RabbitmqConstCommited) for i := 0; i < quecommited.Queue.Messages; i++ { quecommited.WaitAndHandleOne(func(data M) { logrus.Printf("ignoring commited queue data before test") }) } - db := dtmsvr.DbGet() common.PanicIfError(db.Exec("truncate test1.a_saga").Error) common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) @@ -45,33 +107,10 @@ func TestDtmSvr(t *testing.T) { go examples.StartSvr() time.Sleep(time.Duration(100 * 1000 * 1000)) - // 开始第一个正常流程的测试 - saga := genSaga("gid-1", false, false) - saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) - sm := dtmsvr.SagaModel{} - db.Model(&sm).Where("gid=?", saga.Gid).First(&sm) - assert.Equal(t, "prepared", sm.Status) - saga.Commit() - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) - db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm) - assert.Equal(t, "finished", sm.Status) - steps := []dtmsvr.SagaStepModel{} - db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps) - assert.Equal(t, true, steps[0].Status == "pending" && steps[2].Status == "pending" && steps[1].Status == "finished" && steps[3].Status == "finished") - - saga = genSaga("gid-2", false, true) - saga.Commit() - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) - saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) - sm = dtmsvr.SagaModel{} - db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm) - assert.Equal(t, "rollbacked", sm.Status) - steps = []dtmsvr.SagaStepModel{} - db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps) - assert.Equal(t, true, steps[0].Status == "rollbacked" && steps[2].Status == "rollbacked" && steps[1].Status == "finished" && steps[3].Status == "rollbacked") - + prepareCancel(t) + preparePending(t) + noramlSaga(t) + rollbackSaga2(t) // assert.Equal(t, 1, 0) // 开始测试 @@ -81,7 +120,7 @@ func TestDtmSvr(t *testing.T) { } func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { - saga := dtm.SagaNew(examples.TcServer, gid, examples.BusiApi+"/TransQuery") + saga := dtm.SagaNew(examples.TcServer, gid, examples.Busi+"/TransQuery") req := examples.TransReq{ Amount: 30, TransInFailed: inFailed, diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index 2e35aea..eb1bd1e 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -1,6 +1,7 @@ package dtmsvr import ( + "encoding/json" "fmt" "strings" "time" @@ -20,7 +21,7 @@ type SagaModel struct { ModelBase Gid string Steps string - TransQuery string + TransQuery string `json:"trans_query"` Status string FinishTime time.Time RollbackTime time.Time @@ -59,17 +60,11 @@ func HandlePreparedMsg(data M) { }).Create(&m) } -func HandleCommitedMsg(data M) { +func handleCommitedSagaModel(m *SagaModel) { db := DbGet() - logrus.Printf("creating saga model in commited") - steps := data["steps"].([]interface{}) - data["steps"] = common.MustMarshalString(data["steps"]) - m := SagaModel{} - err := common.Map2Obj(data, &m) - common.PanicIfError(err) m.Status = "processing" stepInserted := false - err = db.Transaction(func(db *gorm.DB) error { + err := db.Transaction(func(db *gorm.DB) error { db.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&m) @@ -77,8 +72,10 @@ func HandleCommitedMsg(data M) { db.Model(&m).Where("status=?", "prepared").Update("status", "processing") } nsteps := []SagaStepModel{} - for _, step1 := range steps { - step := step1.(map[string]interface{}) + steps := []M{} + err := json.Unmarshal([]byte(m.Steps), &steps) + common.PanicIfError(err) + for _, step := range steps { nsteps = append(nsteps, SagaStepModel{ Gid: m.Gid, Step: len(nsteps) + 1, @@ -117,6 +114,14 @@ func HandleCommitedMsg(data M) { logrus.Printf("---------------handle commited msmg error: %s", err.Error()) } } +func HandleCommitedMsg(data M) { + logrus.Printf("creating saga model in commited") + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + err := common.Map2Obj(data, &m) + common.PanicIfError(err) + handleCommitedSagaModel(&m) +} func ProcessCommitedSaga(gid string) (rerr error) { steps := []SagaStepModel{} diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index 0f93d81..8c3d75a 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -29,7 +29,7 @@ func DbGet() *gorm.DB { LoadConfig() if db == nil { conf := viper.GetStringMapString("mysql") - dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) + dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) logrus.Printf("connecting %s", strings.Replace(dsn, conf["password"], "****", 1)) db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ SkipDefaultTransaction: true, diff --git a/examples/cli.go b/examples/cli.go index 91a4f5b..4d4b3f7 100644 --- a/examples/cli.go +++ b/examples/cli.go @@ -21,7 +21,7 @@ func StartSvr() { app.POST(BusiApi+"/TransInCompensate", TransInCompensate) app.POST(BusiApi+"/TransOut", TransOut) app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate) - app.POST(BusiApi+"/TransQuery", TransQuery) + app.GET(BusiApi+"/TransQuery", TransQuery) logrus.Printf("examples istening at %d", BusiPort) app.Run(":8081") } diff --git a/examples/saga.go b/examples/saga.go index 9963648..dc1e5a9 100644 --- a/examples/saga.go +++ b/examples/saga.go @@ -2,6 +2,7 @@ package examples import ( "fmt" + "strings" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -67,12 +68,14 @@ func TransOutCompensate(c *gin.Context) { func TransQuery(c *gin.Context) { gid := c.Query("gid") - req := TransReq{} - if err := c.BindJSON(&req); err != nil { - return + logrus.Printf("%s TransQuery", gid) + if strings.Contains(gid, "cancel") { + c.JSON(200, M{"result": "FAIL"}) + } else if strings.Contains(gid, "pending") { + c.JSON(200, M{"result": "PENDING"}) + } else { + c.JSON(200, M{"result": "SUCCESS"}) } - logrus.Printf("%s TransQuery: %v", gid, req) - c.JSON(200, M{"result": "SUCCESS"}) } func trans(req *TransReq) {