From c2b2b3b26c2d97d1c5d59c0ca39154c2e83471a8 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Wed, 19 May 2021 17:00:46 +0800 Subject: [PATCH] all test case passed --- common/utils.go | 25 +++++++++---- dtm_test.go | 48 ++++++++++++++++--------- dtmsvr/consumer.go | 89 +++++++++++++++++++++++----------------------- dtmsvr/cron.go | 67 ++++++++++++++++++++++++++++------ dtmsvr/objects.go | 15 ++++++++ dtmsvr/rabbitmq.go | 9 +++++ examples/saga.go | 38 +++++++++++--------- 7 files changed, 197 insertions(+), 94 deletions(-) diff --git a/common/utils.go b/common/utils.go index 1ccedde..9fe483d 100644 --- a/common/utils.go +++ b/common/utils.go @@ -12,6 +12,15 @@ import ( "github.com/sirupsen/logrus" ) +func OrString(ss ...string) string { + for _, s := range ss { + if s != "" { + return s + } + } + return "" +} + var gNode *snowflake.Node = nil func GenGid() string { @@ -49,12 +58,16 @@ func MustMarshalString(v interface{}) string { return string(MustMarshal(v)) } -func Map2Obj(m map[string]interface{}, obj interface{}) error { - b, err := json.Marshal(m) - if err != nil { - return err - } - return json.Unmarshal(b, obj) +func MustUnmarshalString(s string, obj interface{}) { + err := json.Unmarshal([]byte(s), obj) + PanicIfError(err) +} + +func MustRemarshal(from interface{}, to interface{}) { + b, err := json.Marshal(from) + PanicIfError(err) + err = json.Unmarshal(b, to) + PanicIfError(err) } var RestyClient = resty.New() diff --git a/dtm_test.go b/dtm_test.go index bcee0f5..a3be236 100644 --- a/dtm_test.go +++ b/dtm_test.go @@ -51,7 +51,7 @@ func getSagaStepStatus(gid string) []string { } func noramlSaga(t *testing.T) { - saga := genSaga("gid-normal", false, false) + saga := genSaga("gid-noramlSaga", false, false) saga.Prepare() queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) @@ -62,7 +62,7 @@ func noramlSaga(t *testing.T) { } func rollbackSaga2(t *testing.T) { - saga := genSaga("gid-rollback2", false, true) + saga := genSaga("gid-rollbackSaga2", false, true) saga.Commit() quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) saga.Prepare() @@ -72,43 +72,56 @@ func rollbackSaga2(t *testing.T) { } func prepareCancel(t *testing.T) { - saga := genSaga("gid1-trans-cancel", false, true) + saga := genSaga("gid1-prepareCancel", false, true) saga.Prepare() queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) - dtmsvr.CronPreparedOne(-1 * time.Second) + examples.TransQueryResult = "FAIL" + dtmsvr.CronPreparedOnce(-10 * time.Second) + examples.TransQueryResult = "" assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) } func preparePending(t *testing.T) { - saga := genSaga("gid1-trans-pending", false, true) + saga := genSaga("gid1-preparePending", false, false) saga.Prepare() queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) - dtmsvr.CronPreparedOne(-1 * time.Second) + examples.TransQueryResult = "PENDING" + dtmsvr.CronPreparedOnce(-10 * time.Second) + examples.TransQueryResult = "" assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) + dtmsvr.CronPreparedOnce(-10 * time.Second) + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) +} + +func commitedPending(t *testing.T) { + saga := genSaga("gid-commitedPending", false, false) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + saga.Commit() + examples.TransOutResult = "PENDING" + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid)) + examples.TransOutResult = "" + dtmsvr.CronCommitedOnce(-10 * time.Second) + assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) + assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } func TestDtmSvr(t *testing.T) { // 清理数据 - for i := 0; i < queprepared.Queue.Messages; i++ { - queprepared.WaitAndHandleOne(func(data M) { - logrus.Printf("ignoring prepared queue data before test") - }) - } - for i := 0; i < quecommited.Queue.Messages; i++ { - quecommited.WaitAndHandleOne(func(data M) { - logrus.Printf("ignoring commited queue data before test") - }) - } common.PanicIfError(db.Exec("truncate test1.a_saga").Error) common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) + common.PanicIfError(db.Exec("truncate test1.a_dtrans_log").Error) // 启动组件 go dtmsvr.StartSvr() go examples.StartSvr() time.Sleep(time.Duration(100 * 1000 * 1000)) - prepareCancel(t) preparePending(t) + prepareCancel(t) + commitedPending(t) noramlSaga(t) rollbackSaga2(t) // assert.Equal(t, 1, 0) @@ -120,6 +133,7 @@ func TestDtmSvr(t *testing.T) { } func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { + logrus.Printf("beginning a saga test ---------------- %s", gid) saga := dtm.SagaNew(examples.TcServer, gid, examples.Busi+"/TransQuery") req := examples.TransReq{ Amount: 30, diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index eb1bd1e..790cd19 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -19,10 +19,10 @@ type ModelBase struct { } type SagaModel struct { ModelBase - Gid string - Steps string + Gid string `json:"gid"` + Steps string `json:"steps"` TransQuery string `json:"trans_query"` - Status string + Status string `json:"status"` FinishTime time.Time RollbackTime time.Time } @@ -52,25 +52,41 @@ func HandlePreparedMsg(data M) { logrus.Printf("creating saga model in prepare") data["steps"] = common.MustMarshalString(data["steps"]) m := SagaModel{} - err := common.Map2Obj(data, &m) - common.PanicIfError(err) + common.MustRemarshal(data, &m) m.Status = "prepared" - db.Clauses(clause.OnConflict{ + writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps) + db1 := db.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&m) + common.PanicIfError(db1.Error) } -func handleCommitedSagaModel(m *SagaModel) { +func HandleCommitedMsg(data M) { + logrus.Printf("creating saga model in commited") + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + common.MustRemarshal(data, &m) + saveCommitedSagaModel(&m) + err := ProcessCommitedSaga(m.Gid) + if err != nil { + logrus.Printf("---------------handle commited msmg error: %s", err.Error()) + } +} + +func saveCommitedSagaModel(m *SagaModel) { db := DbGet() - m.Status = "processing" + m.Status = "commited" stepInserted := false err := db.Transaction(func(db *gorm.DB) error { - db.Clauses(clause.OnConflict{ + writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps) + dbr := db.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&m) - if db.Error == nil && db.RowsAffected == 0 { - db.Model(&m).Where("status=?", "prepared").Update("status", "processing") + if dbr.Error == nil && dbr.RowsAffected == 0 { + writeTransLog(m.Gid, "change status", m.Status, -1, "") + dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited") } + common.PanicIfError(dbr.Error) nsteps := []SagaStepModel{} steps := []M{} err := json.Unmarshal([]byte(m.Steps), &steps) @@ -93,6 +109,7 @@ func handleCommitedSagaModel(m *SagaModel) { Status: "pending", }) } + writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps)) r := db.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&nsteps) @@ -109,18 +126,6 @@ func handleCommitedSagaModel(m *SagaModel) { if !stepInserted { return } - err = ProcessCommitedSaga(m.Gid) - if err != nil { - logrus.Printf("---------------handle commited msmg error: %s", err.Error()) - } -} -func HandleCommitedMsg(data M) { - logrus.Printf("creating saga model in commited") - data["steps"] = common.MustMarshalString(data["steps"]) - m := SagaModel{} - err := common.Map2Obj(data, &m) - common.PanicIfError(err) - handleCommitedSagaModel(&m) } func ProcessCommitedSaga(gid string) (rerr error) { @@ -130,22 +135,11 @@ func ProcessCommitedSaga(gid string) (rerr error) { if db1.Error != nil { return db1.Error } - tx := []*gorm.DB{db.Begin()} - defer func() { // 如果直接return出去,则rollback当前的事务 - tx[0].Rollback() - if err := recover(); err != nil { - rerr = err.(error) - } - }() - checkAndCommit := func(db1 *gorm.DB) { + checkAffected := func(db1 *gorm.DB) { common.PanicIfError(db1.Error) if db1.RowsAffected == 0 { panic(fmt.Errorf("duplicate updating")) } - dbr := tx[0].Commit() - common.PanicIfError(dbr.Error) - tx[0] = db.Begin() - common.PanicIfError(tx[0].Error) } current := 0 // 当前正在处理的步骤 for ; current < len(steps); current++ { @@ -160,17 +154,19 @@ func ProcessCommitedSaga(gid string) (rerr error) { } body := resp.String() if strings.Contains(body, "SUCCESS") { - dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{ + writeTransLog(gid, "step finished", "finished", step.Step, "") + dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ "status": "finished", "finish_time": time.Now(), }) - checkAndCommit(dbr) + checkAffected(dbr) } else if strings.Contains(body, "FAIL") { - dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{ + writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") + dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) - checkAndCommit(dbr) + checkAffected(dbr) break } else { return fmt.Errorf("unknown response: %s, will be retried", body) @@ -178,11 +174,12 @@ func ProcessCommitedSaga(gid string) (rerr error) { } } if current == len(steps) { // saga 事务完成 - dbr := tx[0].Model(&SagaModel{}).Where("gid=? and status=?", gid, "processing").Updates(M{ + writeTransLog(gid, "saga finished", "finished", -1, "") + dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{ "status": "finished", "finish_time": time.Now(), }) - checkAndCommit(dbr) + checkAffected(dbr) return nil } for current = current - 1; current >= 0; current-- { @@ -196,11 +193,12 @@ func ProcessCommitedSaga(gid string) (rerr error) { } body := resp.String() if strings.Contains(body, "SUCCESS") { - dbr := tx[0].Model(&step).Where("status=?", step.Status).Updates(M{ + writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") + dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) - checkAndCommit(dbr) + checkAffected(dbr) } else { return fmt.Errorf("expect compensate return SUCCESS") } @@ -208,11 +206,12 @@ func ProcessCommitedSaga(gid string) (rerr error) { if current != -1 { return fmt.Errorf("saga current not -1") } - dbr := tx[0].Model(&SagaModel{}).Where("status=?", "processing").Updates(M{ + writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "") + dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) - checkAndCommit(dbr) + checkAffected(dbr) return nil } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index f99912d..89f5ca5 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -1,24 +1,71 @@ package dtmsvr import ( + "fmt" "strings" "time" "github.com/yedf/dtm/common" ) -func CronPreparedOne(expire time.Duration) { +func CronPreparedOnce(expire time.Duration) { db := DbGet() - sm := SagaModel{} - dbr := db.Model(&sm).Where("update_time > date_add(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").First(&sm) + ss := []SagaModel{} + dbr := db.Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) common.PanicIfError(dbr.Error) - resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery) - common.PanicIfError(err) - body := resp.String() - if strings.Contains(body, "FAIL") { - dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled") + writeTransLog("", "saga fetch prepared", fmt.Sprint(len(ss)), -1, "") + if len(ss) == 0 { + return + } + for _, sm := range ss { + writeTransLog(sm.Gid, "saga touch prepared", "", -1, "") + dbr = db.Model(&sm).Update("id", sm.ID) common.PanicIfError(dbr.Error) - } else if strings.Contains(body, "SUCESS") { - dbr = db.Model(&sm).Where("status = ?", "") + resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery) + common.PanicIfError(err) + body := resp.String() + if strings.Contains(body, "FAIL") { + writeTransLog(sm.Gid, "saga canceled", "canceled", -1, "") + dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled") + common.PanicIfError(dbr.Error) + } else if strings.Contains(body, "SUCCESS") { + m := M{} + steps := []M{} + common.MustRemarshal(sm, &m) + common.PanicIfError(err) + common.MustUnmarshalString(m["steps"].(string), &steps) + m["steps"] = steps + err = rabbit.SendAndConfirm(RabbitmqConstCommited, m) + common.PanicIfError(err) + } + } +} + +func CronPrepared() { + for { + CronPreparedOnce(10 * time.Second) + } +} + +func CronCommitedOnce(expire time.Duration) { + db := DbGet() + ss := []SagaModel{} + dbr := db.Model(&SagaModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "commited").Find(&ss) + common.PanicIfError(dbr.Error) + writeTransLog("", "saga fetch commited", fmt.Sprint(len(ss)), -1, "") + if len(ss) == 0 { + return + } + for _, sm := range ss { + writeTransLog(sm.Gid, "saga touch commited", "", -1, "") + dbr = db.Model(&sm).Update("id", sm.ID) + common.PanicIfError(dbr.Error) + ProcessCommitedSaga(sm.Gid) + } +} + +func CronCommited() { + for { + CronCommitedOnce(10 * time.Second) } } diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index 8c3d75a..201ace7 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -39,3 +39,18 @@ func DbGet() *gorm.DB { } return db } + +func writeTransLog(gid string, action string, status string, step int, detail string) { + db := DbGet() + if detail == "" { + detail = "{}" + } + dbr := db.Table("test1.a_dtrans_log").Create(M{ + "gid": gid, + "action": action, + "status": status, + "step": step, + "detail": detail, + }) + common.PanicIfError(dbr.Error) +} diff --git a/dtmsvr/rabbitmq.go b/dtmsvr/rabbitmq.go index 56d4569..cff6b63 100644 --- a/dtmsvr/rabbitmq.go +++ b/dtmsvr/rabbitmq.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/sirupsen/logrus" "github.com/streadway/amqp" @@ -39,6 +40,8 @@ const ( RabbitmqConstCommited RabbitmqConst = "dtm_commited" ) +var IgnoreMsgBefore = time.Now().Add(-3 * time.Second) // 忽略3秒前的消息 + func RabbitmqNew(conf *RabbitmqConfig) *Rabbitmq { return &Rabbitmq{ Config: *conf, @@ -92,6 +95,7 @@ func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{} ContentType: "application/json", DeliveryMode: amqp.Persistent, Body: body, + Timestamp: time.Now(), }, ) common.PanicIfError(err) @@ -125,6 +129,11 @@ func (q *RabbitmqQueue) WaitAndHandle(handler func(data M)) { func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data M)) { logrus.Printf("%s reading message", q.Name) msg := <-q.Deliveries + for msg.Timestamp.Before(IgnoreMsgBefore) { + logrus.Printf("%s discarding a message %v before %v", q.Name, msg.Timestamp, IgnoreMsgBefore) + msg.Ack(false) + msg = <-q.Deliveries + } data := map[string]interface{}{} err := json.Unmarshal(msg.Body, &data) logrus.Printf("%s handling one message: %v", q.Name, data) diff --git a/examples/saga.go b/examples/saga.go index dc1e5a9..8a2e715 100644 --- a/examples/saga.go +++ b/examples/saga.go @@ -2,14 +2,21 @@ package examples import ( "fmt" - "strings" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtm" ) type M = map[string]interface{} + +var TransInResult = "" +var TransOutResult = "" +var TransInCompensateResult = "" +var TransOutCompensateResult = "" +var TransQueryResult = "" + type TransReq struct { Amount int `json:"amount"` TransInFailed bool `json:"transInFailed"` @@ -22,13 +29,14 @@ func TransIn(c *gin.Context) { if err := c.BindJSON(&req); err != nil { return } - logrus.Printf("%s TransIn: %v", gid, req) if req.TransInFailed { logrus.Printf("%s TransIn %v failed", req) c.Error(fmt.Errorf("TransIn failed for gid: %s", gid)) return } - c.JSON(200, M{"result": "SUCCESS"}) + res := common.OrString(TransInResult, "SUCCESS") + logrus.Printf("%s TransIn: %v result: %s", gid, req, res) + c.JSON(200, M{"result": res}) } func TransInCompensate(c *gin.Context) { @@ -37,8 +45,9 @@ func TransInCompensate(c *gin.Context) { if err := c.BindJSON(&req); err != nil { return } - logrus.Printf("%s TransInCompensate: %v", gid, req) - c.JSON(200, M{"result": "SUCCESS"}) + res := common.OrString(TransInCompensateResult, "SUCCESS") + logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) + c.JSON(200, M{"result": res}) } func TransOut(c *gin.Context) { @@ -47,13 +56,14 @@ func TransOut(c *gin.Context) { if err := c.BindJSON(&req); err != nil { return } - logrus.Printf("%s TransOut: %v", gid, req) if req.TransOutFailed { logrus.Printf("%s TransOut %v failed", gid, req) c.JSON(500, M{"result": "FAIL"}) return } - c.JSON(200, M{"result": "SUCCESS"}) + res := common.OrString(TransOutResult, "SUCCESS") + logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + c.JSON(200, M{"result": res}) } func TransOutCompensate(c *gin.Context) { @@ -62,20 +72,16 @@ func TransOutCompensate(c *gin.Context) { if err := c.BindJSON(&req); err != nil { return } - logrus.Printf("%s TransOutCompensate: %v", gid, req) - c.JSON(200, M{"result": "SUCCESS"}) + res := common.OrString(TransOutCompensateResult, "SUCCESS") + logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) + c.JSON(200, M{"result": res}) } func TransQuery(c *gin.Context) { gid := c.Query("gid") logrus.Printf("%s TransQuery", gid) - if strings.Contains(gid, "cancel") { - c.JSON(200, M{"result": "FAIL"}) - } else if strings.Contains(gid, "pending") { - c.JSON(200, M{"result": "PENDING"}) - } else { - c.JSON(200, M{"result": "SUCCESS"}) - } + res := common.OrString(TransQueryResult, "SUCCESS") + c.JSON(200, M{"result": res}) } func trans(req *TransReq) {