saga seems ok

This commit is contained in:
yedongfu 2021-05-18 21:18:39 +08:00
parent 20367db2cf
commit d20a0f91ec
7 changed files with 95 additions and 22 deletions

View File

@ -4,6 +4,8 @@ import (
"encoding/json" "encoding/json"
"github.com/bwmarrin/snowflake" "github.com/bwmarrin/snowflake"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
) )
var gNode *snowflake.Node = nil var gNode *snowflake.Node = nil
@ -50,3 +52,17 @@ func Map2Obj(m map[string]interface{}, obj interface{}) error {
} }
return json.Unmarshal(b, obj) return json.Unmarshal(b, obj)
} }
var RestyClient = resty.New()
func init() {
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body)
return nil
})
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
r := resp.Request
logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String())
return nil
})
}

View File

@ -4,12 +4,10 @@ import (
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
) )
var client *resty.Client = resty.New()
type SagaData struct { type SagaData struct {
Gid string `json:"gid"` Gid string `json:"gid"`
Steps []SagaStep `json:"steps"` Steps []SagaStep `json:"steps"`
@ -52,7 +50,7 @@ func (s *Saga) getBody() *SagaData {
func (s *Saga) Prepare(url string) error { func (s *Saga) Prepare(url string) error {
s.TransQuery = url s.TransQuery = url
logrus.Printf("preparing %s body: %v", s.Gid, s.getBody()) logrus.Printf("preparing %s body: %v", s.Gid, s.getBody())
resp, err := client.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { if err != nil {
return err return err
} }
@ -64,7 +62,7 @@ func (s *Saga) Prepare(url string) error {
func (s *Saga) Commit() error { func (s *Saga) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, s.getBody()) logrus.Printf("committing %s body: %v", s.Gid, s.getBody())
resp, err := client.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server)) resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil { if err != nil {
return err return err
} }

View File

@ -126,7 +126,6 @@ func ProcessCommitedSaga(gid string) (rerr error) {
if db1.Error != nil { if db1.Error != nil {
return db1.Error return db1.Error
} }
current := 0 // 当前正在处理的步骤
tx := []*gorm.DB{db.Begin()} tx := []*gorm.DB{db.Begin()}
defer func() { // 如果直接return出去则rollback当前的事务 defer func() { // 如果直接return出去则rollback当前的事务
tx[0].Rollback() tx[0].Rollback()
@ -144,13 +143,14 @@ func ProcessCommitedSaga(gid string) (rerr error) {
tx[0] = db.Begin() tx[0] = db.Begin()
common.PanicIfError(tx[0].Error) common.PanicIfError(tx[0].Error)
} }
current := 0 // 当前正在处理的步骤
for ; current < len(steps); current++ { for ; current < len(steps); current++ {
step := steps[current] step := steps[current]
if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" { if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" {
continue continue
} }
if step.Type == "action" && step.Status == "pending" { if step.Type == "action" && step.Status == "pending" {
resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil { if err != nil {
return err return err
} }
@ -167,6 +167,10 @@ func ProcessCommitedSaga(gid string) (rerr error) {
"rollback_time": time.Now(), "rollback_time": time.Now(),
}) })
checkAndCommit(dbr) checkAndCommit(dbr)
break
} else {
logrus.Errorf("unknown response: %s, will be retried", body)
break
} }
} }
} }
@ -178,12 +182,12 @@ func ProcessCommitedSaga(gid string) (rerr error) {
checkAndCommit(dbr) checkAndCommit(dbr)
return nil return nil
} }
for current = len(steps) - 1; current >= 0; current-- { for current = current - 1; current >= 0; current-- {
step := steps[current] step := steps[current]
if step.Type != "compensate" || step.Status != "pending" { if step.Type != "compensate" || step.Status != "pending" {
continue continue
} }
resp, err := client.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil { if err != nil {
return err return err
} }

View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
@ -40,5 +39,3 @@ func DbGet() *gorm.DB {
} }
return db return db
} }
var client *resty.Client = resty.New()

View File

@ -1,6 +1,10 @@
package dtmsvr package dtmsvr
import ( import (
"bytes"
"io/ioutil"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -9,6 +13,23 @@ func Main() {
logrus.Printf("start dtmsvr") logrus.Printf("start dtmsvr")
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
app := gin.Default() app := gin.Default()
app.Use(func(c *gin.Context) {
body := ""
if c.Request.Method == "POST" {
rb, err := c.GetRawData()
if err != nil {
logrus.Printf("GetRawData error: %s", err.Error())
} else {
body = string(rb)
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb))
}
}
began := time.Now()
logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
c.Next()
logrus.Printf("used %d ms %s %s query: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery)
})
AddRoute(app) AddRoute(app)
// StartConsumePreparedMsg(1) // StartConsumePreparedMsg(1)
StartConsumeCommitedMsg(1) StartConsumeCommitedMsg(1)

View File

@ -11,13 +11,15 @@ func Main() {
app := gin.Default() app := gin.Default()
app.POST(BusiApi+"/TransIn", TransIn) app.POST(BusiApi+"/TransIn", TransIn)
app.POST(BusiApi+"/TransInCompensate", TransInCompensate) app.POST(BusiApi+"/TransInCompensate", TransInCompensate)
app.POST(BusiApi+"/TransOut", TransOut)
app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate)
app.POST(BusiApi+"/TransQuery", TransQuery) app.POST(BusiApi+"/TransQuery", TransQuery)
go app.Run(":8081") go app.Run(":8081")
logrus.Printf("examples istening at %d", BusiPort) logrus.Printf("examples istening at %d", BusiPort)
trans(&TransReq{ trans(&TransReq{
amount: 30, Amount: 30,
transInFailed: false, TransInFailed: false,
transOutFailed: false, TransOutFailed: true,
}) })
} }

View File

@ -4,14 +4,15 @@ import (
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/dtm" "github.com/yedf/dtm/dtm"
) )
type TransReq struct { type TransReq struct {
amount int Amount int `json:"amount"`
transInFailed bool TransInFailed bool `json:"transInFailed"`
transOutFailed bool TransOutFailed bool `json:"transOutFailed"`
} }
func TransIn(c *gin.Context) { func TransIn(c *gin.Context) {
@ -21,9 +22,10 @@ func TransIn(c *gin.Context) {
return return
} }
logrus.Printf("%s TransIn: %v", gid, req) logrus.Printf("%s TransIn: %v", gid, req)
if req.transInFailed { if req.TransInFailed {
logrus.Printf("%s TransIn %v failed", req) logrus.Printf("%s TransIn %v failed", req)
c.Error(fmt.Errorf("TransIn failed for gid: %s", gid)) c.Error(fmt.Errorf("TransIn failed for gid: %s", gid))
return
} }
c.JSON(200, gin.H{"result": "SUCCESS"}) c.JSON(200, gin.H{"result": "SUCCESS"})
} }
@ -38,6 +40,34 @@ func TransInCompensate(c *gin.Context) {
c.JSON(200, gin.H{"result": "SUCCESS"}) c.JSON(200, gin.H{"result": "SUCCESS"})
} }
func TransOut(c *gin.Context) {
gid := c.Query("gid")
req := TransReq{}
req2 := TransReq{}
c.ShouldBindBodyWith(&req2, binding.JSON)
c.ShouldBindBodyWith(&req, binding.JSON)
if err := c.BindJSON(&req); err != nil {
return
}
logrus.Printf("%s TransOut: %v", gid, req)
if req.TransOutFailed {
logrus.Printf("%s TransOut %v failed", gid, req)
c.JSON(500, gin.H{"result": "FAIL"})
return
}
c.JSON(200, gin.H{"result": "SUCCESS"})
}
func TransOutCompensate(c *gin.Context) {
gid := c.Query("gid")
req := TransReq{}
if err := c.BindJSON(&req); err != nil {
return
}
logrus.Printf("%s TransOutCompensate: %v", gid, req)
c.JSON(200, gin.H{"result": "SUCCESS"})
}
func TransQuery(c *gin.Context) { func TransQuery(c *gin.Context) {
gid := c.Query("gid") gid := c.Query("gid")
req := TransReq{} req := TransReq{}
@ -55,9 +85,14 @@ func trans(req *TransReq) {
saga := dtm.SagaNew(TcServer, gid) saga := dtm.SagaNew(TcServer, gid)
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{ saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{
"amount": req.amount, "amount": req.Amount,
"transInFailed": req.transInFailed, "transInFailed": req.TransInFailed,
"transOutFailed": req.transOutFailed, "transOutFailed": req.TransOutFailed,
})
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", gin.H{
"amount": req.Amount,
"transInFailed": req.TransInFailed,
"transOutFailed": req.TransOutFailed,
}) })
saga.Prepare(Busi + "/TransQuery") saga.Prepare(Busi + "/TransQuery")
logrus.Printf("busi trans commit") logrus.Printf("busi trans commit")