From 20367db2cf238bfc962427c390b6492d1c62b71a Mon Sep 17 00:00:00 2001 From: yedongfu Date: Tue, 18 May 2021 17:21:31 +0800 Subject: [PATCH] saga normal state ok --- common/utils.go | 12 +++ dtm/saga.go | 23 ++++-- dtmsvr/consumer.go | 190 +++++++++++++++++++++++++++++++++++++++++---- dtmsvr/objects.go | 11 ++- dtmsvr/svr.go | 2 +- examples/saga.go | 9 +-- main.go | 13 ++++ 7 files changed, 227 insertions(+), 33 deletions(-) diff --git a/common/utils.go b/common/utils.go index 0adf1ff..07ec8b4 100644 --- a/common/utils.go +++ b/common/utils.go @@ -38,3 +38,15 @@ func MustMarshal(v interface{}) []byte { PanicIfError(err) return b } + +func MustMarshalString(v interface{}) string { + return string(MustMarshal(v)) +} + +func Map2Obj(m map[string]interface{}, obj interface{}) error { + b, err := json.Marshal(m) + if err != nil { + return err + } + return json.Unmarshal(b, obj) +} diff --git a/dtm/saga.go b/dtm/saga.go index c6b7f40..1a37c87 100644 --- a/dtm/saga.go +++ b/dtm/saga.go @@ -10,18 +10,29 @@ import ( var client *resty.Client = resty.New() -type Saga struct { - Server string `json:"server"` +type SagaData struct { Gid string `json:"gid"` Steps []SagaStep `json:"steps"` TransQuery string `json:"trans_query"` } +type Saga struct { + SagaData + Server string +} type SagaStep struct { Action string `json:"action"` Compensate string `json:"compensate"` PostData gin.H `json:"post_data"` } +func SagaNew(server string, gid string) *Saga { + return &Saga{ + SagaData: SagaData{ + Gid: gid, + }, + Server: server, + } +} func (s *Saga) Add(action string, compensate string, postData gin.H) error { logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) step := SagaStep{ @@ -34,12 +45,8 @@ func (s *Saga) Add(action string, compensate string, postData gin.H) error { return nil } -func (s *Saga) getBody() gin.H { - return gin.H{ - "gid": s.Gid, - "trans_query": s.TransQuery, - "steps": s.Steps, - } +func (s *Saga) getBody() *SagaData { + return &s.SagaData } func (s *Saga) Prepare(url string) error { diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index 807cc59..3986c9a 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -1,11 +1,14 @@ package dtmsvr import ( + "fmt" + "strings" "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -20,34 +23,190 @@ type SagaModel struct { Steps string TransQuery string Status string - FinishTime string - RollbackTime string + FinishTime time.Time + RollbackTime time.Time } func (*SagaModel) TableName() string { return "test1.a_saga" } -func handlePreparedMsg(data gin.H) { - data["gid"] = "4eHhkCxVsQ1" - db := DbGet() - // db.Model(&SagaModel{}).Clauses(clause.OnConflict{ - // DoNothing: true, - // }).Create(data) +type SagaStepModel struct { + ModelBase + Gid string + Data string + Step int + Url string + Type string + Status string + FinishTime string + RollbackTime string +} - logrus.Printf("creating saga model") +func (*SagaStepModel) TableName() string { + return "test1.a_saga_step" +} + +func handlePreparedMsg(data gin.H) { + db := DbGet() + logrus.Printf("creating saga model in prepare") + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + err := common.Map2Obj(data, &m) + common.PanicIfError(err) + m.Status = "prepared" db.Clauses(clause.OnConflict{ DoNothing: true, - }).Create(&SagaModel{ - Gid: data["gid"].(string), - Steps: string(common.MustMarshal(data["steps"])), - TransQuery: data["trans_query"].(string), - Status: "prepared", - }) + }).Create(&m) } func handleCommitedMsg(data gin.H) { + db := DbGet() + logrus.Printf("creating saga model in commited") + steps := data["steps"].([]interface{}) + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + err := common.Map2Obj(data, &m) + common.PanicIfError(err) + m.Status = "processing" + stepInserted := false + err = db.Transaction(func(db *gorm.DB) error { + db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&m) + if db.Error == nil && db.RowsAffected == 0 { + db.Model(&m).Where("status=?", "prepared").Update("status", "processing") + } + nsteps := []SagaStepModel{} + for _, step1 := range steps { + step := step1.(map[string]interface{}) + nsteps = append(nsteps, SagaStepModel{ + Gid: m.Gid, + Step: len(nsteps) + 1, + Data: common.MustMarshalString(step["post_data"]), + Url: step["compensate"].(string), + Type: "compensate", + Status: "pending", + }) + nsteps = append(nsteps, SagaStepModel{ + Gid: m.Gid, + Step: len(nsteps) + 1, + Data: common.MustMarshalString(step["post_data"]), + Url: step["action"].(string), + Type: "action", + Status: "pending", + }) + } + r := db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&nsteps) + if db.Error != nil { + return db.Error + } + if r.RowsAffected == int64(len(nsteps)) { + stepInserted = true + } + logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted) + return db.Error + }) + common.PanicIfError(err) + if !stepInserted { + return + } + err = ProcessCommitedSaga(m.Gid) + if err != nil { + logrus.Printf("---------------handle commited msmg error: %s", err.Error()) + } +} +func ProcessCommitedSaga(gid string) (rerr error) { + steps := []SagaStepModel{} + db := DbGet() + db1 := db.Order("id asc").Find(&steps) + if db1.Error != nil { + return db1.Error + } + current := 0 // 当前正在处理的步骤 + tx := []*gorm.DB{db.Begin()} + defer func() { // 如果直接return出去,则rollback当前的事务 + tx[0].Rollback() + if err := recover(); err != nil { + rerr = err.(error) + } + }() + checkAndCommit := func(db1 *gorm.DB) { + common.PanicIfError(db1.Error) + if db1.RowsAffected == 0 { + panic(fmt.Errorf("duplicate updating")) + } + dbr := tx[0].Commit() + common.PanicIfError(dbr.Error) + tx[0] = db.Begin() + common.PanicIfError(tx[0].Error) + } + for ; current < len(steps); current++ { + step := steps[current] + if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" { + continue + } + if step.Type == "action" && step.Status == "pending" { + resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAndCommit(dbr) + } else if strings.Contains(body, "FAIL") { + dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAndCommit(dbr) + } + } + } + if current == len(steps) { // saga 事务完成 + dbr := tx[0].Model(&SagaModel{}).Where("gid=? and status=?", gid, "processing").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAndCommit(dbr) + return nil + } + for current = len(steps) - 1; current >= 0; current-- { + step := steps[current] + if step.Type != "compensate" || step.Status != "pending" { + continue + } + resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + dbr := tx[0].Model(&step).Where("status=?", step.Status).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAndCommit(dbr) + } else { + return fmt.Errorf("expect compensate return SUCCESS") + } + } + if current != -1 { + return fmt.Errorf("saga current not -1") + } + dbr := tx[0].Model(&SagaModel{}).Where("status=?", "processing").Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAndCommit(dbr) + return nil } func StartConsumePreparedMsg(consumers int) { @@ -70,5 +229,4 @@ func StartConsumeCommitedMsg(consumers int) { que.WaitAndHandle(handleCommitedMsg) }() } - } diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index a2e1f43..e7749cf 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/yedf/dtm/common" @@ -11,6 +12,8 @@ import ( "gorm.io/gorm" ) +type M = map[string]interface{} + var rabbit *Rabbitmq = nil func RabbitmqGet() *Rabbitmq { @@ -27,11 +30,15 @@ func DbGet() *gorm.DB { LoadConfig() if db == nil { conf := viper.GetStringMapString("mysql") - dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) + dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) logrus.Printf("connecting %s", strings.Replace(dsn, conf["password"], "****", 1)) - db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) + db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ + SkipDefaultTransaction: true, + }) common.PanicIfError(err) db = db1.Debug() } return db } + +var client *resty.Client = resty.New() diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 1929d43..bb9842f 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -10,7 +10,7 @@ func Main() { gin.SetMode(gin.ReleaseMode) app := gin.Default() AddRoute(app) - StartConsumePreparedMsg(1) + // StartConsumePreparedMsg(1) StartConsumeCommitedMsg(1) logrus.Printf("dtmsvr listen at: 8080") go app.Run() diff --git a/examples/saga.go b/examples/saga.go index c45dd24..952f76e 100644 --- a/examples/saga.go +++ b/examples/saga.go @@ -5,7 +5,6 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtm" ) @@ -50,12 +49,10 @@ func TransQuery(c *gin.Context) { } func trans(req *TransReq) { - gid := common.GenGid() + // gid := common.GenGid() + gid := "4eHhkCxVsQ1" logrus.Printf("busi transaction begin: %s", gid) - saga := dtm.Saga{ - Server: TcServer, - Gid: gid, - } + saga := dtm.SagaNew(TcServer, gid) saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{ "amount": req.amount, diff --git a/main.go b/main.go index 5184b81..db0d611 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,25 @@ package main import ( "time" + "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) func main() { dtmsvr.LoadConfig() + db := dtmsvr.DbGet() + tx := db.Begin() + common.PanicIfError(tx.Error) + dbr := tx.Commit() + common.PanicIfError(dbr.Error) + + tx = db.Begin() + common.PanicIfError(tx.Error) + dbr = tx.Commit() + common.PanicIfError(dbr.Error) + db.Exec("truncate test1.a_saga") + db.Exec("truncate test1.a_saga_step") // logrus.SetFormatter(&logrus.JSONFormatter{}) // dtmsvr.LoadConfig() // rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)