saga should refactor next

This commit is contained in:
yedongfu 2021-05-28 20:29:42 +08:00
parent fa8abb239f
commit faca20d756
3 changed files with 346 additions and 0 deletions

105
examples/main_saga.go Normal file
View File

@ -0,0 +1,105 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const SagaBusiPort = 8081
const SagaBusiApi = "/api/busi_saga"
var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi)
func SagaMain() {
go SagaStartSvr()
sagaFireRequest()
time.Sleep(1000 * time.Second)
}
func SagaStartSvr() {
logrus.Printf("saga examples starting")
app := common.GetGinApp()
SagaAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBusiPort))
}
func sagaFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer, gid)
saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req)
saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
err := saga.Prepare(SagaBusi + "/TransQuery")
e2p(err)
logrus.Printf("busi trans commit")
err = saga.Commit()
e2p(err)
}
// api
func SagaAddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(sagaTransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(sagaTransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(SagaTransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(sagaTransOutCompensate))
app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(sagaTransQuery))
logrus.Printf("examples listening at %d", SagaBusiPort)
}
var SagaTransInResult = ""
var SagaTransOutResult = ""
var SagaTransInCompensateResult = ""
var SagaTransOutCompensateResult = ""
var SagaTransQueryResult = ""
func sagaTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaTransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func SagaTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaTransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaTransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(SagaTransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

125
examples/main_tcc.go Normal file
View File

@ -0,0 +1,125 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const TccBusiPort = 8083
const TccBusiApi = "/api/busi_tcc"
var TccBusi = fmt.Sprintf("http://localhost:%d%s", TccBusiPort, TccBusiApi)
func TccMain() {
go TccStartSvr()
tccFireRequest()
time.Sleep(1000 * time.Second)
}
func TccStartSvr() {
logrus.Printf("tcc examples starting")
app := common.GetGinApp()
TccAddRoute(app)
app.Run(fmt.Sprintf(":%d", TccBusiPort))
}
func tccFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
tcc := dtm.TccNew(DtmServer, gid)
tcc.Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req)
tcc.Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req)
err := tcc.Prepare(TccBusi + "/TransQuery")
e2p(err)
logrus.Printf("busi trans commit")
err = tcc.Commit()
e2p(err)
}
// api
func TccAddRoute(app *gin.Engine) {
app.POST(TccBusiApi+"/TransInTry", common.WrapHandler(tccTransInTry))
app.POST(TccBusiApi+"/TransInConfirm", common.WrapHandler(tccTransInConfirm))
app.POST(TccBusiApi+"/TransInCancel", common.WrapHandler(tccTransCancel))
app.POST(TccBusiApi+"/TransOutTry", common.WrapHandler(tccTransOutTry))
app.POST(TccBusiApi+"/TransOutConfirm", common.WrapHandler(tccTransOutConfirm))
app.POST(TccBusiApi+"/TransOutCancel", common.WrapHandler(tccTransOutCancel))
app.GET(TccBusiApi+"/TransQuery", common.WrapHandler(tccTransQuery))
logrus.Printf("examples listening at %d", TccBusiPort)
}
var TccTransInTryResult = ""
var TccTransOutTryResult = ""
var TccTransInCancelResult = ""
var TccTransOutCancelResult = ""
var TccTransInConfirmResult = ""
var TccTransOutConfirmResult = ""
var TccTransQueryResult = ""
func tccTransInTry(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInTryResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransInTry: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransInConfirm(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInConfirmResult, "SUCCESS")
logrus.Printf("%s tccTransInConfirm: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransCancel(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInCancelResult, "SUCCESS")
logrus.Printf("%s tccTransCancel: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutTry(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutTryResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutConfirm(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutConfirmResult, "SUCCESS")
logrus.Printf("%s TransOutConfirm: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutCancel(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutCancelResult, "SUCCESS")
logrus.Printf("%s tccTransOutCancel: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(TccTransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

116
examples/main_xa.go Normal file
View File

@ -0,0 +1,116 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
"gorm.io/gorm"
)
// 事务参与者的服务地址
const XaBusiPort = 8082
const XaBusiApi = "/api/busi_xa"
var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi)
var XaClient *dtm.XaClient = nil
type UserAccount struct {
common.ModelBase
UserId int
Balance string
}
func (u *UserAccount) TableName() string { return "user_account" }
func dbGet() *common.MyDb {
return common.DbGet(Config.Mysql)
}
func XaMain() {
go XaStartSvr()
time.Sleep(100 * time.Millisecond)
xaFireRequest()
time.Sleep(1000 * time.Second)
}
func XaStartSvr() {
common.InitApp(&Config)
logrus.Printf("xa examples starting")
app := common.GetGinApp()
XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa")
XaAddRoute(app)
app.Run(fmt.Sprintf(":%d", XaBusiPort))
}
func xaFireRequest() {
gid := common.GenGid()
err := XaClient.XaGlobalTransaction(gid, func() (rerr error) {
defer common.P2E(&rerr)
req := GenTransReq(30, false, false)
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "1",
}).Post(XaBusi + "/TransOut")
common.CheckRestySuccess(resp, err)
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "2",
}).Post(XaBusi + "/TransOut")
common.CheckRestySuccess(resp, err)
return nil
})
e2p(err)
}
// api
func XaAddRoute(app *gin.Engine) {
app.POST(XaBusiApi+"/TransIn", common.WrapHandler(xaTransIn))
app.POST(XaBusiApi+"/TransOut", common.WrapHandler(xaTransOut))
}
func xaTransIn(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) {
req := transReqFromContext(c)
if req.TransInResult != "SUCCESS" {
return fmt.Errorf("tranIn failed")
}
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
Update("balance", gorm.Expr("balance - ?", req.Amount))
return dbr.Error
})
e2p(err)
return M{"result": "SUCCESS"}, nil
}
func xaTransOut(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) {
req := transReqFromContext(c)
if req.TransOutResult != "SUCCESS" {
return fmt.Errorf("tranOut failed")
}
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
Update("balance", gorm.Expr("balance + ?", req.Amount))
return dbr.Error
})
e2p(err)
return M{"result": "SUCCESS"}, nil
}
func ResetXaData() {
db := dbGet()
db.Must().Exec("truncate user_account")
db.Must().Exec("insert into user_account (user_id, balance) values (1, 10000), (2, 10000)")
type XaRow struct {
Data string
}
xas := []XaRow{}
db.Must().Raw("xa recover").Scan(&xas)
for _, xa := range xas {
db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
}
}