diff --git a/common/utils.go b/common/utils.go index 07ec8b4..75cbe54 100644 --- a/common/utils.go +++ b/common/utils.go @@ -4,6 +4,8 @@ import ( "encoding/json" "github.com/bwmarrin/snowflake" + "github.com/go-resty/resty/v2" + "github.com/sirupsen/logrus" ) var gNode *snowflake.Node = nil @@ -50,3 +52,17 @@ func Map2Obj(m map[string]interface{}, obj interface{}) error { } return json.Unmarshal(b, obj) } + +var RestyClient = resty.New() + +func init() { + RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { + logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) + return nil + }) + RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { + r := resp.Request + logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String()) + return nil + }) +} diff --git a/dtm/saga.go b/dtm/saga.go index 1a37c87..b095a0d 100644 --- a/dtm/saga.go +++ b/dtm/saga.go @@ -4,12 +4,10 @@ import ( "fmt" "github.com/gin-gonic/gin" - "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" ) -var client *resty.Client = resty.New() - type SagaData struct { Gid string `json:"gid"` Steps []SagaStep `json:"steps"` @@ -52,7 +50,7 @@ func (s *Saga) getBody() *SagaData { func (s *Saga) Prepare(url string) error { s.TransQuery = url logrus.Printf("preparing %s body: %v", s.Gid, s.getBody()) - resp, err := client.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server)) + resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil { return err } @@ -64,7 +62,7 @@ func (s *Saga) Prepare(url string) error { func (s *Saga) Commit() error { logrus.Printf("committing %s body: %v", s.Gid, s.getBody()) - resp, err := client.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server)) + resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server)) if err != nil { return err } diff --git a/dtmsvr/consumer.go b/dtmsvr/consumer.go index 3986c9a..761890e 100644 --- a/dtmsvr/consumer.go +++ b/dtmsvr/consumer.go @@ -126,7 +126,6 @@ func ProcessCommitedSaga(gid string) (rerr error) { if db1.Error != nil { return db1.Error } - current := 0 // 当前正在处理的步骤 tx := []*gorm.DB{db.Begin()} defer func() { // 如果直接return出去,则rollback当前的事务 tx[0].Rollback() @@ -144,13 +143,14 @@ func ProcessCommitedSaga(gid string) (rerr error) { tx[0] = db.Begin() common.PanicIfError(tx[0].Error) } + current := 0 // 当前正在处理的步骤 for ; current < len(steps); current++ { step := steps[current] if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" { continue } if step.Type == "action" && step.Status == "pending" { - resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) if err != nil { return err } @@ -167,6 +167,10 @@ func ProcessCommitedSaga(gid string) (rerr error) { "rollback_time": time.Now(), }) checkAndCommit(dbr) + break + } else { + logrus.Errorf("unknown response: %s, will be retried", body) + break } } } @@ -178,12 +182,12 @@ func ProcessCommitedSaga(gid string) (rerr error) { checkAndCommit(dbr) return nil } - for current = len(steps) - 1; current >= 0; current-- { + for current = current - 1; current >= 0; current-- { step := steps[current] if step.Type != "compensate" || step.Status != "pending" { continue } - resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) if err != nil { return err } diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index e7749cf..0f93d81 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/yedf/dtm/common" @@ -40,5 +39,3 @@ func DbGet() *gorm.DB { } return db } - -var client *resty.Client = resty.New() diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go index bb9842f..cf17777 100644 --- a/dtmsvr/svr.go +++ b/dtmsvr/svr.go @@ -1,6 +1,10 @@ package dtmsvr import ( + "bytes" + "io/ioutil" + "time" + "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) @@ -9,6 +13,23 @@ func Main() { logrus.Printf("start dtmsvr") gin.SetMode(gin.ReleaseMode) app := gin.Default() + app.Use(func(c *gin.Context) { + body := "" + if c.Request.Method == "POST" { + rb, err := c.GetRawData() + if err != nil { + logrus.Printf("GetRawData error: %s", err.Error()) + } else { + body = string(rb) + c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb)) + } + } + began := time.Now() + logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body) + c.Next() + logrus.Printf("used %d ms %s %s query: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery) + + }) AddRoute(app) // StartConsumePreparedMsg(1) StartConsumeCommitedMsg(1) diff --git a/examples/cli.go b/examples/cli.go index 8860eae..026b3ef 100644 --- a/examples/cli.go +++ b/examples/cli.go @@ -11,13 +11,15 @@ func Main() { app := gin.Default() app.POST(BusiApi+"/TransIn", TransIn) app.POST(BusiApi+"/TransInCompensate", TransInCompensate) + app.POST(BusiApi+"/TransOut", TransOut) + app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate) app.POST(BusiApi+"/TransQuery", TransQuery) go app.Run(":8081") logrus.Printf("examples istening at %d", BusiPort) trans(&TransReq{ - amount: 30, - transInFailed: false, - transOutFailed: false, + Amount: 30, + TransInFailed: false, + TransOutFailed: true, }) } diff --git a/examples/saga.go b/examples/saga.go index 952f76e..11560d2 100644 --- a/examples/saga.go +++ b/examples/saga.go @@ -4,14 +4,15 @@ import ( "fmt" "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" "github.com/sirupsen/logrus" "github.com/yedf/dtm/dtm" ) type TransReq struct { - amount int - transInFailed bool - transOutFailed bool + Amount int `json:"amount"` + TransInFailed bool `json:"transInFailed"` + TransOutFailed bool `json:"transOutFailed"` } func TransIn(c *gin.Context) { @@ -21,9 +22,10 @@ func TransIn(c *gin.Context) { return } logrus.Printf("%s TransIn: %v", gid, req) - if req.transInFailed { + if req.TransInFailed { logrus.Printf("%s TransIn %v failed", req) c.Error(fmt.Errorf("TransIn failed for gid: %s", gid)) + return } c.JSON(200, gin.H{"result": "SUCCESS"}) } @@ -38,6 +40,34 @@ func TransInCompensate(c *gin.Context) { c.JSON(200, gin.H{"result": "SUCCESS"}) } +func TransOut(c *gin.Context) { + gid := c.Query("gid") + req := TransReq{} + req2 := TransReq{} + c.ShouldBindBodyWith(&req2, binding.JSON) + c.ShouldBindBodyWith(&req, binding.JSON) + if err := c.BindJSON(&req); err != nil { + return + } + logrus.Printf("%s TransOut: %v", gid, req) + if req.TransOutFailed { + logrus.Printf("%s TransOut %v failed", gid, req) + c.JSON(500, gin.H{"result": "FAIL"}) + return + } + c.JSON(200, gin.H{"result": "SUCCESS"}) +} + +func TransOutCompensate(c *gin.Context) { + gid := c.Query("gid") + req := TransReq{} + if err := c.BindJSON(&req); err != nil { + return + } + logrus.Printf("%s TransOutCompensate: %v", gid, req) + c.JSON(200, gin.H{"result": "SUCCESS"}) +} + func TransQuery(c *gin.Context) { gid := c.Query("gid") req := TransReq{} @@ -55,9 +85,14 @@ func trans(req *TransReq) { saga := dtm.SagaNew(TcServer, gid) saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{ - "amount": req.amount, - "transInFailed": req.transInFailed, - "transOutFailed": req.transOutFailed, + "amount": req.Amount, + "transInFailed": req.TransInFailed, + "transOutFailed": req.TransOutFailed, + }) + saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", gin.H{ + "amount": req.Amount, + "transInFailed": req.TransInFailed, + "transOutFailed": req.TransOutFailed, }) saga.Prepare(Busi + "/TransQuery") logrus.Printf("busi trans commit")