From faca20d75638e69c510e9a3427728ad45bfb539d Mon Sep 17 00:00:00 2001 From: yedongfu Date: Fri, 28 May 2021 20:29:42 +0800 Subject: [PATCH] saga should refactor next --- examples/main_saga.go | 105 +++++++++++++++++++++++++++++++++++ examples/main_tcc.go | 125 ++++++++++++++++++++++++++++++++++++++++++ examples/main_xa.go | 116 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 346 insertions(+) create mode 100644 examples/main_saga.go create mode 100644 examples/main_tcc.go create mode 100644 examples/main_xa.go diff --git a/examples/main_saga.go b/examples/main_saga.go new file mode 100644 index 0000000..3558503 --- /dev/null +++ b/examples/main_saga.go @@ -0,0 +1,105 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const SagaBusiPort = 8081 +const SagaBusiApi = "/api/busi_saga" + +var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi) + +func SagaMain() { + go SagaStartSvr() + sagaFireRequest() + time.Sleep(1000 * time.Second) +} + +func SagaStartSvr() { + logrus.Printf("saga examples starting") + app := common.GetGinApp() + SagaAddRoute(app) + app.Run(fmt.Sprintf(":%d", SagaBusiPort)) +} + +func sagaFireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + saga := dtm.SagaNew(DtmServer, gid) + + saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) + saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) + err := saga.Prepare(SagaBusi + "/TransQuery") + e2p(err) + logrus.Printf("busi trans commit") + err = saga.Commit() + e2p(err) +} + +// api + +func SagaAddRoute(app *gin.Engine) { + app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(sagaTransIn)) + app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(sagaTransInCompensate)) + app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(SagaTransOut)) + app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(sagaTransOutCompensate)) + app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(sagaTransQuery)) + logrus.Printf("examples listening at %d", SagaBusiPort) +} + +var SagaTransInResult = "" +var SagaTransOutResult = "" +var SagaTransInCompensateResult = "" +var SagaTransOutCompensateResult = "" +var SagaTransQueryResult = "" + +func sagaTransIn(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(SagaTransInResult, req.TransInResult, "SUCCESS") + logrus.Printf("%s TransIn: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func sagaTransInCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(SagaTransInCompensateResult, "SUCCESS") + logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func SagaTransOut(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(SagaTransOutResult, req.TransOutResult, "SUCCESS") + logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func sagaTransOutCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(SagaTransOutCompensateResult, "SUCCESS") + logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func sagaTransQuery(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + logrus.Printf("%s TransQuery", gid) + res := common.OrString(SagaTransQueryResult, "SUCCESS") + return M{"result": res}, nil +} diff --git a/examples/main_tcc.go b/examples/main_tcc.go new file mode 100644 index 0000000..035e8a5 --- /dev/null +++ b/examples/main_tcc.go @@ -0,0 +1,125 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const TccBusiPort = 8083 +const TccBusiApi = "/api/busi_tcc" + +var TccBusi = fmt.Sprintf("http://localhost:%d%s", TccBusiPort, TccBusiApi) + +func TccMain() { + go TccStartSvr() + tccFireRequest() + time.Sleep(1000 * time.Second) +} + +func TccStartSvr() { + logrus.Printf("tcc examples starting") + app := common.GetGinApp() + TccAddRoute(app) + app.Run(fmt.Sprintf(":%d", TccBusiPort)) +} + +func tccFireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + tcc := dtm.TccNew(DtmServer, gid) + + tcc.Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req) + tcc.Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) + err := tcc.Prepare(TccBusi + "/TransQuery") + e2p(err) + logrus.Printf("busi trans commit") + err = tcc.Commit() + e2p(err) +} + +// api + +func TccAddRoute(app *gin.Engine) { + app.POST(TccBusiApi+"/TransInTry", common.WrapHandler(tccTransInTry)) + app.POST(TccBusiApi+"/TransInConfirm", common.WrapHandler(tccTransInConfirm)) + app.POST(TccBusiApi+"/TransInCancel", common.WrapHandler(tccTransCancel)) + app.POST(TccBusiApi+"/TransOutTry", common.WrapHandler(tccTransOutTry)) + app.POST(TccBusiApi+"/TransOutConfirm", common.WrapHandler(tccTransOutConfirm)) + app.POST(TccBusiApi+"/TransOutCancel", common.WrapHandler(tccTransOutCancel)) + app.GET(TccBusiApi+"/TransQuery", common.WrapHandler(tccTransQuery)) + logrus.Printf("examples listening at %d", TccBusiPort) +} + +var TccTransInTryResult = "" +var TccTransOutTryResult = "" +var TccTransInCancelResult = "" +var TccTransOutCancelResult = "" +var TccTransInConfirmResult = "" +var TccTransOutConfirmResult = "" +var TccTransQueryResult = "" + +func tccTransInTry(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransInTryResult, req.TransInResult, "SUCCESS") + logrus.Printf("%s TransInTry: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransInConfirm(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransInConfirmResult, "SUCCESS") + logrus.Printf("%s tccTransInConfirm: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransCancel(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransInCancelResult, "SUCCESS") + logrus.Printf("%s tccTransCancel: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransOutTry(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransOutTryResult, req.TransOutResult, "SUCCESS") + logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransOutConfirm(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransOutConfirmResult, "SUCCESS") + logrus.Printf("%s TransOutConfirm: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransOutCancel(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TccTransOutCancelResult, "SUCCESS") + logrus.Printf("%s tccTransOutCancel: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func tccTransQuery(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + logrus.Printf("%s TransQuery", gid) + res := common.OrString(TccTransQueryResult, "SUCCESS") + return M{"result": res}, nil +} diff --git a/examples/main_xa.go b/examples/main_xa.go new file mode 100644 index 0000000..5709c9c --- /dev/null +++ b/examples/main_xa.go @@ -0,0 +1,116 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" + "gorm.io/gorm" +) + +// 事务参与者的服务地址 +const XaBusiPort = 8082 +const XaBusiApi = "/api/busi_xa" + +var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi) + +var XaClient *dtm.XaClient = nil + +type UserAccount struct { + common.ModelBase + UserId int + Balance string +} + +func (u *UserAccount) TableName() string { return "user_account" } + +func dbGet() *common.MyDb { + return common.DbGet(Config.Mysql) +} + +func XaMain() { + go XaStartSvr() + time.Sleep(100 * time.Millisecond) + xaFireRequest() + time.Sleep(1000 * time.Second) +} + +func XaStartSvr() { + common.InitApp(&Config) + logrus.Printf("xa examples starting") + app := common.GetGinApp() + XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa") + XaAddRoute(app) + app.Run(fmt.Sprintf(":%d", XaBusiPort)) +} + +func xaFireRequest() { + gid := common.GenGid() + err := XaClient.XaGlobalTransaction(gid, func() (rerr error) { + defer common.P2E(&rerr) + req := GenTransReq(30, false, false) + resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ + "gid": gid, + "user_id": "1", + }).Post(XaBusi + "/TransOut") + common.CheckRestySuccess(resp, err) + resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ + "gid": gid, + "user_id": "2", + }).Post(XaBusi + "/TransOut") + common.CheckRestySuccess(resp, err) + return nil + }) + e2p(err) +} + +// api +func XaAddRoute(app *gin.Engine) { + app.POST(XaBusiApi+"/TransIn", common.WrapHandler(xaTransIn)) + app.POST(XaBusiApi+"/TransOut", common.WrapHandler(xaTransOut)) +} + +func xaTransIn(c *gin.Context) (interface{}, error) { + err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) { + req := transReqFromContext(c) + if req.TransInResult != "SUCCESS" { + return fmt.Errorf("tranIn failed") + } + dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). + Update("balance", gorm.Expr("balance - ?", req.Amount)) + return dbr.Error + }) + e2p(err) + return M{"result": "SUCCESS"}, nil +} + +func xaTransOut(c *gin.Context) (interface{}, error) { + err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.MyDb) (rerr error) { + req := transReqFromContext(c) + if req.TransOutResult != "SUCCESS" { + return fmt.Errorf("tranOut failed") + } + dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). + Update("balance", gorm.Expr("balance + ?", req.Amount)) + return dbr.Error + }) + e2p(err) + return M{"result": "SUCCESS"}, nil +} + +func ResetXaData() { + db := dbGet() + db.Must().Exec("truncate user_account") + db.Must().Exec("insert into user_account (user_id, balance) values (1, 10000), (2, 10000)") + type XaRow struct { + Data string + } + xas := []XaRow{} + db.Must().Raw("xa recover").Scan(&xas) + for _, xa := range xas { + db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data)) + } +}