diff --git a/common/utils.go b/common/utils.go index 75cbe54..1ccedde 100644 --- a/common/utils.go +++ b/common/utils.go @@ -1,9 +1,13 @@ package common import ( + "bytes" "encoding/json" + "io/ioutil" + "time" "github.com/bwmarrin/snowflake" + "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" ) @@ -66,3 +70,26 @@ func init() { return nil }) } + +func GetGinApp() *gin.Engine { + gin.SetMode(gin.ReleaseMode) + app := gin.Default() + app.Use(func(c *gin.Context) { + body := "" + if c.Request.Method == "POST" { + rb, err := c.GetRawData() + if err != nil { + logrus.Printf("GetRawData error: %s", err.Error()) + } else { + body = string(rb) + c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) + } + } + began := time.Now() + logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + c.Next() + logrus.Printf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + + }) + return app +} diff --git a/dtm/saga.go b/dtm/saga.go index b095a0d..4d4a1e1 100644 --- a/dtm/saga.go +++ b/dtm/saga.go @@ -3,42 +3,42 @@ package dtm import ( "fmt" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) +type Saga struct { + SagaData + Server string +} + type SagaData struct { Gid string `json:"gid"` Steps []SagaStep `json:"steps"` TransQuery string `json:"trans_query"` } -type Saga struct { - SagaData - Server string -} type SagaStep struct { Action string `json:"action"` Compensate string `json:"compensate"` - PostData gin.H `json:"post_data"` + PostData string `json:"post_data"` } -func SagaNew(server string, gid string) *Saga { +func SagaNew(server string, gid string, transQuery string) *Saga { return &Saga{ SagaData: SagaData{ - Gid: gid, + Gid: gid, + TransQuery: transQuery, }, Server: server, } } -func (s *Saga) Add(action string, compensate string, postData gin.H) error { +func (s *Saga) Add(action string, compensate string, postData interface{}) error { logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) step := SagaStep{ Action: action, Compensate: compensate, - PostData: postData, + PostData: common.MustMarshalString(postData), } - step.PostData = postData s.Steps = append(s.Steps, step) return nil } @@ -47,8 +47,7 @@ func (s *Saga) getBody() *SagaData { return &s.SagaData } -func (s *Saga) Prepare(url string) error { - s.TransQuery = url +func (s *Saga) Prepare() error { logrus.Printf("preparing %s body: %v", s.Gid, s.getBody()) resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil { diff --git a/dtm_test.go b/dtm_test.go index f241c9d..62e8c17 100644 --- a/dtm_test.go +++ b/dtm_test.go @@ -2,10 +2,15 @@ package main import ( "testing" + "time" "github.com/go-playground/assert/v2" + "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtm" "github.com/yedf/dtm/dtmsvr" + "github.com/yedf/dtm/examples" ) func init() { @@ -16,8 +21,73 @@ func TestViper(t *testing.T) { assert.Equal(t, "test_val", viper.GetString("test")) } -func TTestDtmSvr(t *testing.T) { +func TestDtmSvr(t *testing.T) { + // 清理数据 + rabbit := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) + queprepared := rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared) + for i := 0; i < queprepared.Queue.Messages; i++ { + queprepared.WaitAndHandleOne(func(data M) { + logrus.Printf("ignoring prepared queue data before test") + }) + } + quecommited := rabbit.QueueNew(dtmsvr.RabbitmqConstCommited) + for i := 0; i < quecommited.Queue.Messages; i++ { + quecommited.WaitAndHandleOne(func(data M) { + logrus.Printf("ignoring commited queue data before test") + }) + } + db := dtmsvr.DbGet() + common.PanicIfError(db.Exec("truncate test1.a_saga").Error) + common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) + + // 启动组件 + go dtmsvr.StartSvr() + go examples.StartSvr() + time.Sleep(time.Duration(100 * 1000 * 1000)) + + // 开始第一个正常流程的测试 + saga := genSaga("gid-1", false, false) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + sm := dtmsvr.SagaModel{} + db.Model(&sm).Where("gid=?", saga.Gid).First(&sm) + assert.Equal(t, "prepared", sm.Status) + saga.Commit() + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm) + assert.Equal(t, "finished", sm.Status) + steps := []dtmsvr.SagaStepModel{} + db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps) + assert.Equal(t, true, steps[0].Status == "pending" && steps[2].Status == "pending" && steps[1].Status == "finished" && steps[3].Status == "finished") + + saga = genSaga("gid-2", false, true) + saga.Commit() + quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg) + saga.Prepare() + queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) + sm = dtmsvr.SagaModel{} + db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm) + assert.Equal(t, "rollbacked", sm.Status) + steps = []dtmsvr.SagaStepModel{} + db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps) + assert.Equal(t, true, steps[0].Status == "rollbacked" && steps[2].Status == "rollbacked" && steps[1].Status == "finished" && steps[3].Status == "rollbacked") + + // assert.Equal(t, 1, 0) + // 开始测试 + // 发送Prepare请求后,验证数据库 // ConsumeHalfMsg 验证数据库 // ConsumeMsg 验证数据库 } + +func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { + saga := dtm.SagaNew(examples.TcServer, gid, examples.BusiApi+"/TransQuery") + req := examples.TransReq{ + Amount: 30, + TransInFailed: inFailed, + TransOutFailed: outFailed, + } + saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInCompensate", &req) + saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutCompensate", &req) + return saga +} diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index 761890e..2e35aea 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "gorm.io/gorm" @@ -47,7 +46,7 @@ func (*SagaStepModel) TableName() string { return "test1.a_saga_step" } -func handlePreparedMsg(data gin.H) { +func HandlePreparedMsg(data M) { db := DbGet() logrus.Printf("creating saga model in prepare") data["steps"] = common.MustMarshalString(data["steps"]) @@ -60,7 +59,7 @@ func handlePreparedMsg(data gin.H) { }).Create(&m) } -func handleCommitedMsg(data gin.H) { +func HandleCommitedMsg(data M) { db := DbGet() logrus.Printf("creating saga model in commited") steps := data["steps"].([]interface{}) @@ -83,7 +82,7 @@ func handleCommitedMsg(data gin.H) { nsteps = append(nsteps, SagaStepModel{ Gid: m.Gid, Step: len(nsteps) + 1, - Data: common.MustMarshalString(step["post_data"]), + Data: step["post_data"].(string), Url: step["compensate"].(string), Type: "compensate", Status: "pending", @@ -91,7 +90,7 @@ func handleCommitedMsg(data gin.H) { nsteps = append(nsteps, SagaStepModel{ Gid: m.Gid, Step: len(nsteps) + 1, - Data: common.MustMarshalString(step["post_data"]), + Data: step["post_data"].(string), Url: step["action"].(string), Type: "action", Status: "pending", @@ -169,8 +168,7 @@ func ProcessCommitedSaga(gid string) (rerr error) { checkAndCommit(dbr) break } else { - logrus.Errorf("unknown response: %s, will be retried", body) - break + return fmt.Errorf("unknown response: %s, will be retried", body) } } } @@ -219,7 +217,7 @@ func StartConsumePreparedMsg(consumers int) { for i := 0; i < consumers; i++ { go func() { que := r.QueueNew(RabbitmqConstPrepared) - que.WaitAndHandle(handlePreparedMsg) + que.WaitAndHandle(HandlePreparedMsg) }() } } @@ -230,7 +228,7 @@ func StartConsumeCommitedMsg(consumers int) { for i := 0; i < consumers; i++ { go func() { que := r.QueueNew(RabbitmqConstCommited) - que.WaitAndHandle(handleCommitedMsg) + que.WaitAndHandle(HandleCommitedMsg) }() } } diff --git a/dtmsvr/rabbitmq.go b/dtmsvr/rabbitmq.go index e6201eb..56d4569 100644 --- a/dtmsvr/rabbitmq.go +++ b/dtmsvr/rabbitmq.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/yedf/dtm/common" @@ -107,6 +106,7 @@ func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{} type RabbitmqQueue struct { Name string + Queue *amqp.Queue Channel *amqp.Channel Conn *amqp.Connection Deliveries <-chan amqp.Delivery @@ -117,12 +117,12 @@ func (q *RabbitmqQueue) Close() { // q.Conn.Close() } -func (q *RabbitmqQueue) WaitAndHandle(handler func(data gin.H)) { +func (q *RabbitmqQueue) WaitAndHandle(handler func(data M)) { for { q.WaitAndHandleOne(handler) } } -func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data gin.H)) { +func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data M)) { logrus.Printf("%s reading message", q.Name) msg := <-q.Deliveries data := map[string]interface{}{} @@ -168,6 +168,7 @@ func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue { ) common.PanicIfError(err) return &RabbitmqQueue{ + Queue: &queue, Name: queueName, Channel: channel, Deliveries: deliveries, diff --git a/dtmsvr/rabbitmq_test.go b/dtmsvr/rabbitmq_test.go index 901caef..f875e98 100644 --- a/dtmsvr/rabbitmq_test.go +++ b/dtmsvr/rabbitmq_test.go @@ -3,7 +3,6 @@ package dtmsvr import ( "testing" - "github.com/gin-gonic/gin" "github.com/magiconair/properties/assert" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" @@ -19,12 +18,12 @@ func TestRabbitConfig(t *testing.T) { func TestRabbitmq1Msg(t *testing.T) { rb := RabbitmqNew(&ServerConfig.Rabbitmq) - err := rb.SendAndConfirm(RabbitmqConstPrepared, gin.H{ + err := rb.SendAndConfirm(RabbitmqConstPrepared, M{ "gid": common.GenGid(), }) assert.Equal(t, nil, err) queue := rb.QueueNew(RabbitmqConstPrepared) - queue.WaitAndHandle(func(data gin.H) { + queue.WaitAndHandle(func(data M) { logrus.Printf("processed msg: %v in queue1", data) }) assert.Equal(t, 0, 1) diff --git a/dtmsvr/service.go b/dtmsvr/service.go index 3d4e814..8ebb38b 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -11,7 +11,7 @@ func AddRoute(engine *gin.Engine) { } func Prepare(c *gin.Context) { - data := gin.H{} + data := M{} err := c.BindJSON(&data) if err != nil { return @@ -19,11 +19,11 @@ func Prepare(c *gin.Context) { rabbit := RabbitmqGet() err = rabbit.SendAndConfirm(RabbitmqConstPrepared, data) common.PanicIfError(err) - c.JSON(200, gin.H{"message": "SUCCESS"}) + c.JSON(200, M{"message": "SUCCESS"}) } func Commit(c *gin.Context) { - data := gin.H{} + data := M{} err := c.BindJSON(&data) if err != nil { return @@ -31,5 +31,5 @@ func Commit(c *gin.Context) { rabbit := RabbitmqGet() err = rabbit.SendAndConfirm(RabbitmqConstCommited, data) common.PanicIfError(err) - c.JSON(200, gin.H{"message": "SUCCESS"}) + c.JSON(200, M{"message": "SUCCESS"}) } diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index cf17777..001f1f0 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -1,38 +1,21 @@ package dtmsvr import ( - "bytes" - "io/ioutil" - "time" - - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" ) func Main() { - logrus.Printf("start dtmsvr") - gin.SetMode(gin.ReleaseMode) - app := gin.Default() - app.Use(func(c *gin.Context) { - body := "" - if c.Request.Method == "POST" { - rb, err := c.GetRawData() - if err != nil { - logrus.Printf("GetRawData error: %s", err.Error()) - } else { - body = string(rb) - c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) - } - } - began := time.Now() - logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) - c.Next() - logrus.Printf("used %d ms %s %s query: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery) - - }) - AddRoute(app) - // StartConsumePreparedMsg(1) + StartConsumePreparedMsg(1) StartConsumeCommitedMsg(1) logrus.Printf("dtmsvr listen at: 8080") - go app.Run() + go StartSvr() +} + +func StartSvr() { + logrus.Printf("start dtmsvr") + app := common.GetGinApp() + AddRoute(app) + logrus.Printf("dtmsvr listen at: 8080") + app.Run() } diff --git a/examples/cli.go b/examples/cli.go index 026b3ef..91a4f5b 100644 --- a/examples/cli.go +++ b/examples/cli.go @@ -1,25 +1,27 @@ package examples import ( - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" ) func Main() { - logrus.Printf("examples starting") - gin.SetMode(gin.ReleaseMode) - app := gin.Default() - app.POST(BusiApi+"/TransIn", TransIn) - app.POST(BusiApi+"/TransInCompensate", TransInCompensate) - app.POST(BusiApi+"/TransOut", TransOut) - app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate) - app.POST(BusiApi+"/TransQuery", TransQuery) - - go app.Run(":8081") - logrus.Printf("examples istening at %d", BusiPort) + go StartSvr() trans(&TransReq{ Amount: 30, TransInFailed: false, TransOutFailed: true, }) } + +func StartSvr() { + logrus.Printf("examples starting") + app := common.GetGinApp() + app.POST(BusiApi+"/TransIn", TransIn) + app.POST(BusiApi+"/TransInCompensate", TransInCompensate) + app.POST(BusiApi+"/TransOut", TransOut) + app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate) + app.POST(BusiApi+"/TransQuery", TransQuery) + logrus.Printf("examples istening at %d", BusiPort) + app.Run(":8081") +} diff --git a/examples/saga.go b/examples/saga.go index 11560d2..9963648 100644 --- a/examples/saga.go +++ b/examples/saga.go @@ -4,11 +4,11 @@ import ( "fmt" "github.com/gin-gonic/gin" - "github.com/gin-gonic/gin/binding" "github.com/sirupsen/logrus" "github.com/yedf/dtm/dtm" ) +type M = map[string]interface{} type TransReq struct { Amount int `json:"amount"` TransInFailed bool `json:"transInFailed"` @@ -27,7 +27,7 @@ func TransIn(c *gin.Context) { c.Error(fmt.Errorf("TransIn failed for gid: %s", gid)) return } - c.JSON(200, gin.H{"result": "SUCCESS"}) + c.JSON(200, M{"result": "SUCCESS"}) } func TransInCompensate(c *gin.Context) { @@ -37,25 +37,22 @@ func TransInCompensate(c *gin.Context) { return } logrus.Printf("%s TransInCompensate: %v", gid, req) - c.JSON(200, gin.H{"result": "SUCCESS"}) + c.JSON(200, M{"result": "SUCCESS"}) } func TransOut(c *gin.Context) { gid := c.Query("gid") req := TransReq{} - req2 := TransReq{} - c.ShouldBindBodyWith(&req2, binding.JSON) - c.ShouldBindBodyWith(&req, binding.JSON) if err := c.BindJSON(&req); err != nil { return } logrus.Printf("%s TransOut: %v", gid, req) if req.TransOutFailed { logrus.Printf("%s TransOut %v failed", gid, req) - c.JSON(500, gin.H{"result": "FAIL"}) + c.JSON(500, M{"result": "FAIL"}) return } - c.JSON(200, gin.H{"result": "SUCCESS"}) + c.JSON(200, M{"result": "SUCCESS"}) } func TransOutCompensate(c *gin.Context) { @@ -65,7 +62,7 @@ func TransOutCompensate(c *gin.Context) { return } logrus.Printf("%s TransOutCompensate: %v", gid, req) - c.JSON(200, gin.H{"result": "SUCCESS"}) + c.JSON(200, M{"result": "SUCCESS"}) } func TransQuery(c *gin.Context) { @@ -75,26 +72,26 @@ func TransQuery(c *gin.Context) { return } logrus.Printf("%s TransQuery: %v", gid, req) - c.JSON(200, gin.H{"result": "SUCCESS"}) + c.JSON(200, M{"result": "SUCCESS"}) } func trans(req *TransReq) { // gid := common.GenGid() gid := "4eHhkCxVsQ1" logrus.Printf("busi transaction begin: %s", gid) - saga := dtm.SagaNew(TcServer, gid) + saga := dtm.SagaNew(TcServer, gid, Busi+"/TransQuery") - saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{ + saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", M{ "amount": req.Amount, "transInFailed": req.TransInFailed, "transOutFailed": req.TransOutFailed, }) - saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", gin.H{ + saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", M{ "amount": req.Amount, "transInFailed": req.TransInFailed, "transOutFailed": req.TransOutFailed, }) - saga.Prepare(Busi + "/TransQuery") + saga.Prepare() logrus.Printf("busi trans commit") saga.Commit() } diff --git a/main.go b/main.go index db0d611..597a761 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "time" "github.com/yedf/dtm/common" @@ -8,8 +9,17 @@ import ( "github.com/yedf/dtm/examples" ) +type M = map[string]interface{} + func main() { dtmsvr.LoadConfig() + + s := common.MustMarshalString(M{ + "a": 1, + "b": "str", + }) + var obj interface{} + json.Unmarshal([]byte(s), &obj) db := dtmsvr.DbGet() tx := db.Begin() common.PanicIfError(tx.Error) @@ -25,7 +35,7 @@ func main() { // logrus.SetFormatter(&logrus.JSONFormatter{}) // dtmsvr.LoadConfig() // rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) - // err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, gin.H{ + // err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, M{ // "gid": common.GenGid(), // }) // common.PanicIfError(err)