saga normal state ok

This commit is contained in:
yedongfu 2021-05-18 17:21:31 +08:00
parent 41c49a1e20
commit 20367db2cf
7 changed files with 227 additions and 33 deletions

View File

@ -38,3 +38,15 @@ func MustMarshal(v interface{}) []byte {
PanicIfError(err)
return b
}
func MustMarshalString(v interface{}) string {
return string(MustMarshal(v))
}
func Map2Obj(m map[string]interface{}, obj interface{}) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
return json.Unmarshal(b, obj)
}

View File

@ -10,18 +10,29 @@ import (
var client *resty.Client = resty.New()
type Saga struct {
Server string `json:"server"`
type SagaData struct {
Gid string `json:"gid"`
Steps []SagaStep `json:"steps"`
TransQuery string `json:"trans_query"`
}
type Saga struct {
SagaData
Server string
}
type SagaStep struct {
Action string `json:"action"`
Compensate string `json:"compensate"`
PostData gin.H `json:"post_data"`
}
func SagaNew(server string, gid string) *Saga {
return &Saga{
SagaData: SagaData{
Gid: gid,
},
Server: server,
}
}
func (s *Saga) Add(action string, compensate string, postData gin.H) error {
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
step := SagaStep{
@ -34,12 +45,8 @@ func (s *Saga) Add(action string, compensate string, postData gin.H) error {
return nil
}
func (s *Saga) getBody() gin.H {
return gin.H{
"gid": s.Gid,
"trans_query": s.TransQuery,
"steps": s.Steps,
}
func (s *Saga) getBody() *SagaData {
return &s.SagaData
}
func (s *Saga) Prepare(url string) error {

View File

@ -1,11 +1,14 @@
package dtmsvr
import (
"fmt"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
@ -20,34 +23,190 @@ type SagaModel struct {
Steps string
TransQuery string
Status string
FinishTime string
RollbackTime string
FinishTime time.Time
RollbackTime time.Time
}
func (*SagaModel) TableName() string {
return "test1.a_saga"
}
func handlePreparedMsg(data gin.H) {
data["gid"] = "4eHhkCxVsQ1"
db := DbGet()
// db.Model(&SagaModel{}).Clauses(clause.OnConflict{
// DoNothing: true,
// }).Create(data)
type SagaStepModel struct {
ModelBase
Gid string
Data string
Step int
Url string
Type string
Status string
FinishTime string
RollbackTime string
}
logrus.Printf("creating saga model")
func (*SagaStepModel) TableName() string {
return "test1.a_saga_step"
}
func handlePreparedMsg(data gin.H) {
db := DbGet()
logrus.Printf("creating saga model in prepare")
data["steps"] = common.MustMarshalString(data["steps"])
m := SagaModel{}
err := common.Map2Obj(data, &m)
common.PanicIfError(err)
m.Status = "prepared"
db.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&SagaModel{
Gid: data["gid"].(string),
Steps: string(common.MustMarshal(data["steps"])),
TransQuery: data["trans_query"].(string),
Status: "prepared",
})
}).Create(&m)
}
func handleCommitedMsg(data gin.H) {
db := DbGet()
logrus.Printf("creating saga model in commited")
steps := data["steps"].([]interface{})
data["steps"] = common.MustMarshalString(data["steps"])
m := SagaModel{}
err := common.Map2Obj(data, &m)
common.PanicIfError(err)
m.Status = "processing"
stepInserted := false
err = db.Transaction(func(db *gorm.DB) error {
db.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&m)
if db.Error == nil && db.RowsAffected == 0 {
db.Model(&m).Where("status=?", "prepared").Update("status", "processing")
}
nsteps := []SagaStepModel{}
for _, step1 := range steps {
step := step1.(map[string]interface{})
nsteps = append(nsteps, SagaStepModel{
Gid: m.Gid,
Step: len(nsteps) + 1,
Data: common.MustMarshalString(step["post_data"]),
Url: step["compensate"].(string),
Type: "compensate",
Status: "pending",
})
nsteps = append(nsteps, SagaStepModel{
Gid: m.Gid,
Step: len(nsteps) + 1,
Data: common.MustMarshalString(step["post_data"]),
Url: step["action"].(string),
Type: "action",
Status: "pending",
})
}
r := db.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&nsteps)
if db.Error != nil {
return db.Error
}
if r.RowsAffected == int64(len(nsteps)) {
stepInserted = true
}
logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted)
return db.Error
})
common.PanicIfError(err)
if !stepInserted {
return
}
err = ProcessCommitedSaga(m.Gid)
if err != nil {
logrus.Printf("---------------handle commited msmg error: %s", err.Error())
}
}
func ProcessCommitedSaga(gid string) (rerr error) {
steps := []SagaStepModel{}
db := DbGet()
db1 := db.Order("id asc").Find(&steps)
if db1.Error != nil {
return db1.Error
}
current := 0 // 当前正在处理的步骤
tx := []*gorm.DB{db.Begin()}
defer func() { // 如果直接return出去则rollback当前的事务
tx[0].Rollback()
if err := recover(); err != nil {
rerr = err.(error)
}
}()
checkAndCommit := func(db1 *gorm.DB) {
common.PanicIfError(db1.Error)
if db1.RowsAffected == 0 {
panic(fmt.Errorf("duplicate updating"))
}
dbr := tx[0].Commit()
common.PanicIfError(dbr.Error)
tx[0] = db.Begin()
common.PanicIfError(tx[0].Error)
}
for ; current < len(steps); current++ {
step := steps[current]
if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" {
continue
}
if step.Type == "action" && step.Status == "pending" {
resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil {
return err
}
body := resp.String()
if strings.Contains(body, "SUCCESS") {
dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{
"status": "finished",
"finish_time": time.Now(),
})
checkAndCommit(dbr)
} else if strings.Contains(body, "FAIL") {
dbr := tx[0].Model(&step).Where("status=?", "pending").Updates(M{
"status": "rollbacked",
"rollback_time": time.Now(),
})
checkAndCommit(dbr)
}
}
}
if current == len(steps) { // saga 事务完成
dbr := tx[0].Model(&SagaModel{}).Where("gid=? and status=?", gid, "processing").Updates(M{
"status": "finished",
"finish_time": time.Now(),
})
checkAndCommit(dbr)
return nil
}
for current = len(steps) - 1; current >= 0; current-- {
step := steps[current]
if step.Type != "compensate" || step.Status != "pending" {
continue
}
resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil {
return err
}
body := resp.String()
if strings.Contains(body, "SUCCESS") {
dbr := tx[0].Model(&step).Where("status=?", step.Status).Updates(M{
"status": "rollbacked",
"rollback_time": time.Now(),
})
checkAndCommit(dbr)
} else {
return fmt.Errorf("expect compensate return SUCCESS")
}
}
if current != -1 {
return fmt.Errorf("saga current not -1")
}
dbr := tx[0].Model(&SagaModel{}).Where("status=?", "processing").Updates(M{
"status": "rollbacked",
"rollback_time": time.Now(),
})
checkAndCommit(dbr)
return nil
}
func StartConsumePreparedMsg(consumers int) {
@ -70,5 +229,4 @@ func StartConsumeCommitedMsg(consumers int) {
que.WaitAndHandle(handleCommitedMsg)
}()
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/yedf/dtm/common"
@ -11,6 +12,8 @@ import (
"gorm.io/gorm"
)
type M = map[string]interface{}
var rabbit *Rabbitmq = nil
func RabbitmqGet() *Rabbitmq {
@ -27,11 +30,15 @@ func DbGet() *gorm.DB {
LoadConfig()
if db == nil {
conf := viper.GetStringMapString("mysql")
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"])
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"])
logrus.Printf("connecting %s", strings.Replace(dsn, conf["password"], "****", 1))
db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
SkipDefaultTransaction: true,
})
common.PanicIfError(err)
db = db1.Debug()
}
return db
}
var client *resty.Client = resty.New()

View File

@ -10,7 +10,7 @@ func Main() {
gin.SetMode(gin.ReleaseMode)
app := gin.Default()
AddRoute(app)
StartConsumePreparedMsg(1)
// StartConsumePreparedMsg(1)
StartConsumeCommitedMsg(1)
logrus.Printf("dtmsvr listen at: 8080")
go app.Run()

View File

@ -5,7 +5,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtm"
)
@ -50,12 +49,10 @@ func TransQuery(c *gin.Context) {
}
func trans(req *TransReq) {
gid := common.GenGid()
// gid := common.GenGid()
gid := "4eHhkCxVsQ1"
logrus.Printf("busi transaction begin: %s", gid)
saga := dtm.Saga{
Server: TcServer,
Gid: gid,
}
saga := dtm.SagaNew(TcServer, gid)
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{
"amount": req.amount,

13
main.go
View File

@ -3,12 +3,25 @@ package main
import (
"time"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/examples"
)
func main() {
dtmsvr.LoadConfig()
db := dtmsvr.DbGet()
tx := db.Begin()
common.PanicIfError(tx.Error)
dbr := tx.Commit()
common.PanicIfError(dbr.Error)
tx = db.Begin()
common.PanicIfError(tx.Error)
dbr = tx.Commit()
common.PanicIfError(dbr.Error)
db.Exec("truncate test1.a_saga")
db.Exec("truncate test1.a_saga_step")
// logrus.SetFormatter(&logrus.JSONFormatter{})
// dtmsvr.LoadConfig()
// rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)