From 104ff472dc6c6446cb29956d734e971929678539 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 20 May 2021 12:41:08 +0800 Subject: [PATCH] remove queue --- common/utils.go | 7 +- dtm_test.go | 20 ++-- dtmsvr/config.go | 3 +- dtmsvr/consumer.go | 198 -------------------------------------- dtmsvr/cron.go | 12 +-- dtmsvr/objects.go | 10 -- dtmsvr/rabbitmq.go | 189 ------------------------------------ dtmsvr/rabbitmq_test.go | 30 ------ dtmsvr/scanPrepared.go | 5 - dtmsvr/service.go | 207 +++++++++++++++++++++++++++++++++++++--- dtmsvr/svr.go | 2 - 11 files changed, 208 insertions(+), 475 deletions(-) delete mode 100644 dtmsvr/rabbitmq.go delete mode 100644 dtmsvr/rabbitmq_test.go delete mode 100644 dtmsvr/scanPrepared.go diff --git a/common/utils.go b/common/utils.go index 9fe483d..483496c 100644 --- a/common/utils.go +++ b/common/utils.go @@ -58,10 +58,13 @@ func MustMarshalString(v interface{}) string { return string(MustMarshal(v)) } -func MustUnmarshalString(s string, obj interface{}) { - err := json.Unmarshal([]byte(s), obj) +func MustUnmarshal(b []byte, obj interface{}) { + err := json.Unmarshal(b, obj) PanicIfError(err) } +func MustUnmarshalString(s string, obj interface{}) { + MustUnmarshal([]byte(s), obj) +} func MustRemarshal(from interface{}, to interface{}) { b, err := json.Marshal(from) diff --git a/dtm_test.go b/dtm_test.go index a3be236..2668de4 100644 --- a/dtm_test.go +++ b/dtm_test.go @@ -27,9 +27,6 @@ var myinit int = func() int { }() // 测试使用的全局对象 -var rabbit = dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) -var queprepared = rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared) -var quecommited = rabbit.QueueNew(dtmsvr.RabbitmqConstCommited) var db = dtmsvr.DbGet() func getSagaModel(gid string) *dtmsvr.SagaModel { @@ -53,20 +50,18 @@ func getSagaStepStatus(gid string) []string { func noramlSaga(t *testing.T) { saga := genSaga("gid-noramlSaga", false, false) saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) saga.Commit() - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) - assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) + assert.Equal(t, "commited", getSagaModel(saga.Gid).Status) + dtmsvr.WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) } func rollbackSaga2(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) saga.Commit() - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + dtmsvr.WaitCommitedSaga(saga.Gid) saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status) assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid)) } @@ -74,7 +69,6 @@ func rollbackSaga2(t *testing.T) { func prepareCancel(t *testing.T) { saga := genSaga("gid1-prepareCancel", false, true) saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) examples.TransQueryResult = "FAIL" dtmsvr.CronPreparedOnce(-10 * time.Second) examples.TransQueryResult = "" @@ -84,31 +78,31 @@ func prepareCancel(t *testing.T) { func preparePending(t *testing.T) { saga := genSaga("gid1-preparePending", false, false) saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) examples.TransQueryResult = "PENDING" dtmsvr.CronPreparedOnce(-10 * time.Second) examples.TransQueryResult = "" assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) dtmsvr.CronPreparedOnce(-10 * time.Second) - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + dtmsvr.WaitCommitedSaga(saga.Gid) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } func commitedPending(t *testing.T) { saga := genSaga("gid-commitedPending", false, false) saga.Prepare() - queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) saga.Commit() examples.TransOutResult = "PENDING" - quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + dtmsvr.WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid)) examples.TransOutResult = "" dtmsvr.CronCommitedOnce(-10 * time.Second) + dtmsvr.WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } func TestDtmSvr(t *testing.T) { + dtmsvr.SagaProcessedTestChan = make(chan string, 1) // 清理数据 common.PanicIfError(db.Exec("truncate test1.a_saga").Error) common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) diff --git a/dtmsvr/config.go b/dtmsvr/config.go index 686c254..e331582 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -14,8 +14,7 @@ import ( ) type Config struct { - Server string `json:"server"` - Rabbitmq RabbitmqConfig `json:"rabbitmq"` + Server string `json:"server"` } var ServerConfig Config = Config{} diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index 790cd19..dc57abb 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -1,15 +1,7 @@ package dtmsvr import ( - "encoding/json" - "fmt" - "strings" "time" - - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" - "gorm.io/gorm" - "gorm.io/gorm/clause" ) type ModelBase struct { @@ -46,193 +38,3 @@ type SagaStepModel struct { func (*SagaStepModel) TableName() string { return "test1.a_saga_step" } - -func HandlePreparedMsg(data M) { - db := DbGet() - logrus.Printf("creating saga model in prepare") - data["steps"] = common.MustMarshalString(data["steps"]) - m := SagaModel{} - common.MustRemarshal(data, &m) - m.Status = "prepared" - writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps) - db1 := db.Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&m) - common.PanicIfError(db1.Error) -} - -func HandleCommitedMsg(data M) { - logrus.Printf("creating saga model in commited") - data["steps"] = common.MustMarshalString(data["steps"]) - m := SagaModel{} - common.MustRemarshal(data, &m) - saveCommitedSagaModel(&m) - err := ProcessCommitedSaga(m.Gid) - if err != nil { - logrus.Printf("---------------handle commited msmg error: %s", err.Error()) - } -} - -func saveCommitedSagaModel(m *SagaModel) { - db := DbGet() - m.Status = "commited" - stepInserted := false - err := db.Transaction(func(db *gorm.DB) error { - writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps) - dbr := db.Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&m) - if dbr.Error == nil && dbr.RowsAffected == 0 { - writeTransLog(m.Gid, "change status", m.Status, -1, "") - dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited") - } - common.PanicIfError(dbr.Error) - nsteps := []SagaStepModel{} - steps := []M{} - err := json.Unmarshal([]byte(m.Steps), &steps) - common.PanicIfError(err) - for _, step := range steps { - nsteps = append(nsteps, SagaStepModel{ - Gid: m.Gid, - Step: len(nsteps) + 1, - Data: step["post_data"].(string), - Url: step["compensate"].(string), - Type: "compensate", - Status: "pending", - }) - nsteps = append(nsteps, SagaStepModel{ - Gid: m.Gid, - Step: len(nsteps) + 1, - Data: step["post_data"].(string), - Url: step["action"].(string), - Type: "action", - Status: "pending", - }) - } - writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps)) - r := db.Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(&nsteps) - if db.Error != nil { - return db.Error - } - if r.RowsAffected == int64(len(nsteps)) { - stepInserted = true - } - logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted) - return db.Error - }) - common.PanicIfError(err) - if !stepInserted { - return - } -} - -func ProcessCommitedSaga(gid string) (rerr error) { - steps := []SagaStepModel{} - db := DbGet() - db1 := db.Order("id asc").Find(&steps) - if db1.Error != nil { - return db1.Error - } - checkAffected := func(db1 *gorm.DB) { - common.PanicIfError(db1.Error) - if db1.RowsAffected == 0 { - panic(fmt.Errorf("duplicate updating")) - } - } - current := 0 // 当前正在处理的步骤 - for ; current < len(steps); current++ { - step := steps[current] - if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" { - continue - } - if step.Type == "action" && step.Status == "pending" { - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } - body := resp.String() - if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step finished", "finished", step.Step, "") - dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) - } else if strings.Contains(body, "FAIL") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") - dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) - break - } else { - return fmt.Errorf("unknown response: %s, will be retried", body) - } - } - } - if current == len(steps) { // saga 事务完成 - writeTransLog(gid, "saga finished", "finished", -1, "") - dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) - return nil - } - for current = current - 1; current >= 0; current-- { - step := steps[current] - if step.Type != "compensate" || step.Status != "pending" { - continue - } - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } - body := resp.String() - if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") - dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) - } else { - return fmt.Errorf("expect compensate return SUCCESS") - } - } - if current != -1 { - return fmt.Errorf("saga current not -1") - } - writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "") - dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) - return nil -} - -func StartConsumePreparedMsg(consumers int) { - logrus.Printf("start to consume prepared msg") - r := RabbitmqGet() - for i := 0; i < consumers; i++ { - go func() { - que := r.QueueNew(RabbitmqConstPrepared) - que.WaitAndHandle(HandlePreparedMsg) - }() - } -} - -func StartConsumeCommitedMsg(consumers int) { - logrus.Printf("start to consume commited msg") - r := RabbitmqGet() - for i := 0; i < consumers; i++ { - go func() { - que := r.QueueNew(RabbitmqConstCommited) - que.WaitAndHandle(HandleCommitedMsg) - }() - } -} diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 89f5ca5..03cc564 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -29,14 +29,8 @@ func CronPreparedOnce(expire time.Duration) { dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled") common.PanicIfError(dbr.Error) } else if strings.Contains(body, "SUCCESS") { - m := M{} - steps := []M{} - common.MustRemarshal(sm, &m) - common.PanicIfError(err) - common.MustUnmarshalString(m["steps"].(string), &steps) - m["steps"] = steps - err = rabbit.SendAndConfirm(RabbitmqConstCommited, m) - common.PanicIfError(err) + saveCommitedSagaModel(&sm) + go ProcessCommitedSaga(sm.Gid) } } } @@ -60,7 +54,7 @@ func CronCommitedOnce(expire time.Duration) { writeTransLog(sm.Gid, "saga touch commited", "", -1, "") dbr = db.Model(&sm).Update("id", sm.ID) common.PanicIfError(dbr.Error) - ProcessCommitedSaga(sm.Gid) + go ProcessCommitedSaga(sm.Gid) } } diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index 201ace7..29d3b0d 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -13,16 +13,6 @@ import ( type M = map[string]interface{} -var rabbit *Rabbitmq = nil - -func RabbitmqGet() *Rabbitmq { - LoadConfig() - if rabbit == nil { - rabbit = RabbitmqNew(&ServerConfig.Rabbitmq) - } - return rabbit -} - var db *gorm.DB = nil func DbGet() *gorm.DB { diff --git a/dtmsvr/rabbitmq.go b/dtmsvr/rabbitmq.go deleted file mode 100644 index cff6b63..0000000 --- a/dtmsvr/rabbitmq.go +++ /dev/null @@ -1,189 +0,0 @@ -package dtmsvr - -import ( - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/sirupsen/logrus" - "github.com/streadway/amqp" - "github.com/yedf/dtm/common" -) - -type Rabbitmq struct { - Config RabbitmqConfig - ChannelPool *sync.Pool -} - -type RabbitmqConfig struct { - Host string - Username string - Password string - Vhost string - Exchange string - KeyPrepared string - KeyCommited string - QueuePrepared string - QueueCommited string -} - -type RabbitmqChannel struct { - Confirms chan amqp.Confirmation - Channel *amqp.Channel -} - -type RabbitmqConst string - -const ( - RabbitmqConstPrepared RabbitmqConst = "dtm_prepared" - RabbitmqConstCommited RabbitmqConst = "dtm_commited" -) - -var IgnoreMsgBefore = time.Now().Add(-3 * time.Second) // 忽略3秒前的消息 - -func RabbitmqNew(conf *RabbitmqConfig) *Rabbitmq { - return &Rabbitmq{ - Config: *conf, - ChannelPool: &sync.Pool{ - New: func() interface{} { - channel := newChannel(conf) - err := channel.Confirm(false) - common.PanicIfError(err) - confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 2)) - return &RabbitmqChannel{ - Channel: channel, - Confirms: confirms, - } - }, - }, - } -} - -func newChannel(conf *RabbitmqConfig) *amqp.Channel { - uri := fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Username, conf.Password, conf.Host, conf.Vhost) - logrus.Printf("connecting rabbitmq: %s", uri) - conn, err := amqp.Dial(uri) - common.PanicIfError(err) - channel, err := conn.Channel() - common.PanicIfError(err) - err = channel.ExchangeDeclare( - conf.Exchange, // exchange name - "direct", // exchange type - true, // durable - false, // autoDelete - false, // internal - false, // noWait - nil, // args - ) - common.PanicIfError(err) - return channel -} - -func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{}) error { - body, err := json.Marshal(data) - common.PanicIfError(err) - channel := r.ChannelPool.Get().(*RabbitmqChannel) - - logrus.Printf("publishing %s %v", key, data) - err = channel.Channel.Publish( - r.Config.Exchange, - common.If(key == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), - true, - false, - amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: body, - Timestamp: time.Now(), - }, - ) - common.PanicIfError(err) - confirm := <-channel.Confirms - r.ChannelPool.Put(channel) - logrus.Printf("confirmed %t for %s", confirm.Ack, data["gid"]) - if !confirm.Ack { - return fmt.Errorf("confirm not ok for %s", data["gid"]) - } - return nil -} - -type RabbitmqQueue struct { - Name string - Queue *amqp.Queue - Channel *amqp.Channel - Conn *amqp.Connection - Deliveries <-chan amqp.Delivery -} - -func (q *RabbitmqQueue) Close() { - q.Channel.Close() - // q.Conn.Close() -} - -func (q *RabbitmqQueue) WaitAndHandle(handler func(data M)) { - for { - q.WaitAndHandleOne(handler) - } -} -func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data M)) { - logrus.Printf("%s reading message", q.Name) - msg := <-q.Deliveries - for msg.Timestamp.Before(IgnoreMsgBefore) { - logrus.Printf("%s discarding a message %v before %v", q.Name, msg.Timestamp, IgnoreMsgBefore) - msg.Ack(false) - msg = <-q.Deliveries - } - data := map[string]interface{}{} - err := json.Unmarshal(msg.Body, &data) - logrus.Printf("%s handling one message: %v", q.Name, data) - common.PanicIfError(err) - handler(data) - err = msg.Ack(false) - common.PanicIfError(err) - logrus.Printf("%s acked msg: %d", q.Name, msg.DeliveryTag) -} - -func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue { - channel := newChannel(&r.Config) - queueName := common.If(queueType == RabbitmqConstPrepared, r.Config.QueuePrepared, r.Config.QueueCommited).(string) - queue, err := channel.QueueDeclare( - queueName, // name of the queue - true, // durable - false, // delete when unused - false, // exclusive - false, // noWait - nil, // arguments - ) - common.PanicIfError(err) - logrus.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange", - queue.Name, queue.Messages, queue.Consumers) - err = channel.QueueBind( - queue.Name, // name of the queue - common.If(queueType == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), // bindingKey - r.Config.Exchange, // sourceExchange - false, // noWait - nil, // arguments - ) - common.PanicIfError(err) - deliveries, err := channel.Consume( - queue.Name, // name - "simple-consumer", // consumerTag, - false, // noAck - false, // exclusive - false, // noLocal - false, // noWait - nil, // arguments - ) - common.PanicIfError(err) - return &RabbitmqQueue{ - Queue: &queue, - Name: queueName, - Channel: channel, - Deliveries: deliveries, - } -} - -func (r *Rabbitmq) HandleMsg(data interface{}) { - -} diff --git a/dtmsvr/rabbitmq_test.go b/dtmsvr/rabbitmq_test.go deleted file mode 100644 index f875e98..0000000 --- a/dtmsvr/rabbitmq_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package dtmsvr - -import ( - "testing" - - "github.com/magiconair/properties/assert" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" -) - -func init() { - LoadConfig() -} - -func TestRabbitConfig(t *testing.T) { - assert.Matches(t, ServerConfig.Rabbitmq.KeyCommited, "key_committed") -} - -func TestRabbitmq1Msg(t *testing.T) { - rb := RabbitmqNew(&ServerConfig.Rabbitmq) - err := rb.SendAndConfirm(RabbitmqConstPrepared, M{ - "gid": common.GenGid(), - }) - assert.Equal(t, nil, err) - queue := rb.QueueNew(RabbitmqConstPrepared) - queue.WaitAndHandle(func(data M) { - logrus.Printf("processed msg: %v in queue1", data) - }) - assert.Equal(t, 0, 1) -} diff --git a/dtmsvr/scanPrepared.go b/dtmsvr/scanPrepared.go deleted file mode 100644 index 00ee768..0000000 --- a/dtmsvr/scanPrepared.go +++ /dev/null @@ -1,5 +0,0 @@ -package dtmsvr - -func startScanPrepared() { - -} diff --git a/dtmsvr/service.go b/dtmsvr/service.go index 8ebb38b..c5ca731 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -1,8 +1,16 @@ package dtmsvr import ( + "encoding/json" + "fmt" + "strings" + "time" + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) func AddRoute(engine *gin.Engine) { @@ -10,26 +18,195 @@ func AddRoute(engine *gin.Engine) { engine.POST("/api/dtmsvr/commit", Commit) } -func Prepare(c *gin.Context) { +func getSagaModelFromContext(c *gin.Context) *SagaModel { data := M{} - err := c.BindJSON(&data) - if err != nil { - return - } - rabbit := RabbitmqGet() - err = rabbit.SendAndConfirm(RabbitmqConstPrepared, data) + b, err := c.GetRawData() common.PanicIfError(err) + common.MustUnmarshal(b, &data) + logrus.Printf("creating saga model in prepare") + data["steps"] = common.MustMarshalString(data["steps"]) + m := SagaModel{} + common.MustRemarshal(data, &m) + return &m +} + +func Prepare(c *gin.Context) { + db := DbGet() + m := getSagaModelFromContext(c) + m.Status = "prepared" + writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps) + db1 := db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&m) + common.PanicIfError(db1.Error) c.JSON(200, M{"message": "SUCCESS"}) } func Commit(c *gin.Context) { - data := M{} - err := c.BindJSON(&data) - if err != nil { - return - } - rabbit := RabbitmqGet() - err = rabbit.SendAndConfirm(RabbitmqConstCommited, data) - common.PanicIfError(err) + m := getSagaModelFromContext(c) + saveCommitedSagaModel(m) + go ProcessCommitedSaga(m.Gid) c.JSON(200, M{"message": "SUCCESS"}) } + +func saveCommitedSagaModel(m *SagaModel) { + db := DbGet() + m.Status = "commited" + stepInserted := false + err := db.Transaction(func(db *gorm.DB) error { + writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps) + dbr := db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&m) + if dbr.Error == nil && dbr.RowsAffected == 0 { + writeTransLog(m.Gid, "change status", m.Status, -1, "") + dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited") + } + common.PanicIfError(dbr.Error) + nsteps := []SagaStepModel{} + steps := []M{} + err := json.Unmarshal([]byte(m.Steps), &steps) + common.PanicIfError(err) + for _, step := range steps { + nsteps = append(nsteps, SagaStepModel{ + Gid: m.Gid, + Step: len(nsteps) + 1, + Data: step["post_data"].(string), + Url: step["compensate"].(string), + Type: "compensate", + Status: "pending", + }) + nsteps = append(nsteps, SagaStepModel{ + Gid: m.Gid, + Step: len(nsteps) + 1, + Data: step["post_data"].(string), + Url: step["action"].(string), + Type: "action", + Status: "pending", + }) + } + writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps)) + r := db.Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(&nsteps) + if db.Error != nil { + return db.Error + } + if r.RowsAffected == int64(len(nsteps)) { + stepInserted = true + } + logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted) + return db.Error + }) + common.PanicIfError(err) + if !stepInserted { + return + } +} + +var SagaProcessedTestChan chan string = nil // 用于测试时,通知处理结束 + +func WaitCommitedSaga(gid string) { + id := <-SagaProcessedTestChan + for id != gid { + logrus.Errorf("-------id %s not match gid %s", id, gid) + id = <-SagaProcessedTestChan + } +} + +func ProcessCommitedSaga(gid string) { + err := innerProcessCommitedSaga(gid) + if err != nil { + logrus.Errorf("process commited saga error: %s", err.Error()) + } + if SagaProcessedTestChan != nil { + SagaProcessedTestChan <- gid + } +} + +func innerProcessCommitedSaga(gid string) (rerr error) { + steps := []SagaStepModel{} + db := DbGet() + db1 := db.Order("id asc").Find(&steps) + if db1.Error != nil { + return db1.Error + } + checkAffected := func(db1 *gorm.DB) { + common.PanicIfError(db1.Error) + if db1.RowsAffected == 0 { + panic(fmt.Errorf("duplicate updating")) + } + } + current := 0 // 当前正在处理的步骤 + for ; current < len(steps); current++ { + step := steps[current] + if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" { + continue + } + if step.Type == "action" && step.Status == "pending" { + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step finished", "finished", step.Step, "") + dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + } else if strings.Contains(body, "FAIL") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") + dbr := db.Model(&step).Where("status=?", "pending").Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + break + } else { + return fmt.Errorf("unknown response: %s, will be retried", body) + } + } + } + if current == len(steps) { // saga 事务完成 + writeTransLog(gid, "saga finished", "finished", -1, "") + dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + return nil + } + for current = current - 1; current >= 0; current-- { + step := steps[current] + if step.Type != "compensate" || step.Status != "pending" { + continue + } + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "") + dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + } else { + return fmt.Errorf("expect compensate return SUCCESS") + } + } + if current != -1 { + return fmt.Errorf("saga current not -1") + } + writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "") + dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + return nil +} diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index 001f1f0..69b772b 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -6,8 +6,6 @@ import ( ) func Main() { - StartConsumePreparedMsg(1) - StartConsumeCommitedMsg(1) logrus.Printf("dtmsvr listen at: 8080") go StartSvr() }