From 0c42bdeeb9e8bee5413d08382e80c73a599e5145 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Mon, 24 May 2021 14:42:17 +0800 Subject: [PATCH] ready to add xa --- app/main.go | 2 +- dtmsvr/dtmsvr.sql | 17 +++++- dtmsvr/dtmsvr_test.go | 18 +++---- examples/api.go | 89 ------------------------------- examples/config.go | 8 --- examples/main.go | 41 --------------- examples/saga_main.go | 120 ++++++++++++++++++++++++++++++++++++++++++ examples/xa_main.go | 34 ++++++++++++ 8 files changed, 179 insertions(+), 150 deletions(-) delete mode 100644 examples/api.go delete mode 100644 examples/main.go create mode 100644 examples/saga_main.go create mode 100644 examples/xa_main.go diff --git a/app/main.go b/app/main.go index f7efb1e..f4f92bf 100644 --- a/app/main.go +++ b/app/main.go @@ -16,7 +16,7 @@ func main() { dtmsvr.LoadConfig() if cmd == "" { // 所有服务都启动 go dtmsvr.StartSvr() - go examples.StartSvr() + go examples.SagaStartSvr() } else if cmd == "dtmsvr" { go dtmsvr.StartSvr() } diff --git a/dtmsvr/dtmsvr.sql b/dtmsvr/dtmsvr.sql index b03a8dc..ff0997b 100644 --- a/dtmsvr/dtmsvr.sql +++ b/dtmsvr/dtmsvr.sql @@ -1,6 +1,6 @@ -CREATE DATABASE `dtm` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; +-- CREATE DATABASE `dtm` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; -use dtm; +-- use dtm; drop table IF EXISTS saga; CREATE TABLE `saga` ( @@ -51,3 +51,16 @@ CREATE TABLE `trans_log` ( KEY `gid` (`gid`), KEY `create_time` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=48 DEFAULT CHARSET=utf8mb4; + +drop table if EXISTS user_account; +CREATE TABLE `user_account` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `user_id` int(11) DEFAULT NULL, + `balance` decimal(10,2) NOT NULL DEFAULT '0.00', + `create_time` datetime DEFAULT CURRENT_TIMESTAMP, + `update_time` datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `user_id` (`user_id`), + KEY `create_time` (`create_time`), + KEY `update_time` (`update_time`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index fdebf34..0423e0c 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -31,7 +31,7 @@ func TestDtmSvr(t *testing.T) { // 启动组件 go StartSvr() - go examples.StartSvr() + go examples.SagaStartSvr() time.Sleep(time.Duration(100 * 1000 * 1000)) preparePending(t) @@ -94,7 +94,7 @@ func prepareCancel(t *testing.T) { saga := genSaga("gid1-prepareCancel", false, true) saga.Prepare() examples.TransQueryResult = "FAIL" - Config.PreparedExpire = 0 + Config.PreparedExpire = -10 CronPreparedOnce(-10 * time.Second) examples.TransQueryResult = "" Config.PreparedExpire = 60 @@ -116,11 +116,11 @@ func preparePending(t *testing.T) { func commitedPending(t *testing.T) { saga := genSaga("gid-commitedPending", false, false) saga.Prepare() - saga.Commit() examples.TransOutResult = "PENDING" + saga.Commit() WaitCommitedSaga(saga.Gid) - assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid)) examples.TransOutResult = "" + assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid)) CronCommitedOnce(-10 * time.Second) WaitCommitedSaga(saga.Gid) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) @@ -129,13 +129,13 @@ func commitedPending(t *testing.T) { func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtm.SagaNew(examples.DtmServer, gid, examples.Busi+"/TransQuery") + saga := dtm.SagaNew(examples.DtmServer, gid, examples.SagaBusi+"/TransQuery") req := examples.TransReq{ Amount: 30, - TransInFailed: inFailed, - TransOutFailed: outFailed, + TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string), + TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string), } - saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInCompensate", &req) - saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutCompensate", &req) + saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) + saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) return saga } diff --git a/examples/api.go b/examples/api.go deleted file mode 100644 index a8fbc41..0000000 --- a/examples/api.go +++ /dev/null @@ -1,89 +0,0 @@ -package examples - -import ( - "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" -) - -func AddRoute(app *gin.Engine) { - app.POST(BusiApi+"/TransIn", common.WrapHandler(TransIn)) - app.POST(BusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate)) - app.POST(BusiApi+"/TransOut", common.WrapHandler(TransOut)) - app.POST(BusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate)) - app.GET(BusiApi+"/TransQuery", common.WrapHandler(TransQuery)) - logrus.Printf("examples istening at %d", BusiPort) -} - -type M = map[string]interface{} - -var TransInResult = "" -var TransOutResult = "" -var TransInCompensateResult = "" -var TransOutCompensateResult = "" -var TransQueryResult = "" - -type TransReq struct { - Amount int `json:"amount"` - TransInFailed bool `json:"transInFailed"` - TransOutFailed bool `json:"transOutFailed"` -} - -func TransIn(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := TransReq{} - if err := c.BindJSON(&req); err != nil { - return nil, err - } - if req.TransInFailed { - logrus.Printf("%s TransIn %v failed", gid, req) - return M{"result": "FAIL"}, nil - } - res := common.OrString(TransInResult, "SUCCESS") - logrus.Printf("%s TransIn: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransInCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := TransReq{} - if err := c.BindJSON(&req); err != nil { - return nil, err - } - res := common.OrString(TransInCompensateResult, "SUCCESS") - logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransOut(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := TransReq{} - if err := c.BindJSON(&req); err != nil { - return nil, err - } - if req.TransOutFailed { - logrus.Printf("%s TransOut %v failed", gid, req) - return M{"result": "FAIL"}, nil - } - res := common.OrString(TransOutResult, "SUCCESS") - logrus.Printf("%s TransOut: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransOutCompensate(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - req := TransReq{} - if err := c.BindJSON(&req); err != nil { - return nil, err - } - res := common.OrString(TransOutCompensateResult, "SUCCESS") - logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) - return M{"result": res}, nil -} - -func TransQuery(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - logrus.Printf("%s TransQuery", gid) - res := common.OrString(TransQueryResult, "SUCCESS") - return M{"result": res}, nil -} diff --git a/examples/config.go b/examples/config.go index a676691..502fb27 100644 --- a/examples/config.go +++ b/examples/config.go @@ -1,12 +1,4 @@ package examples -import "fmt" - // 指定dtm服务地址 const DtmServer = "http://localhost:8080/api/dtmsvr" - -// 事务参与制的服务地址 -const BusiPort = 8081 -const BusiApi = "/api/busi" - -var Busi = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiApi) diff --git a/examples/main.go b/examples/main.go deleted file mode 100644 index 584cab5..0000000 --- a/examples/main.go +++ /dev/null @@ -1,41 +0,0 @@ -package examples - -import ( - "time" - - "github.com/sirupsen/logrus" - "github.com/yedf/dtm" - "github.com/yedf/dtm/common" -) - -func Main() { - go StartSvr() - FireRequest() - time.Sleep(1000 * time.Second) -} - -func FireRequest() { - gid := common.GenGid() - logrus.Printf("busi transaction begin: %s", gid) - req := &TransReq{ - Amount: 30, - TransInFailed: false, - TransOutFailed: false, - } - saga := dtm.SagaNew(DtmServer, gid, Busi+"/TransQuery") - - saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", req) - saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", req) - err := saga.Prepare() - common.PanicIfError(err) - logrus.Printf("busi trans commit") - err = saga.Commit() - common.PanicIfError(err) -} - -func StartSvr() { - logrus.Printf("examples starting") - app := common.GetGinApp() - AddRoute(app) - app.Run(":8081") -} diff --git a/examples/saga_main.go b/examples/saga_main.go new file mode 100644 index 0000000..fa18441 --- /dev/null +++ b/examples/saga_main.go @@ -0,0 +1,120 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const SagaBusiPort = 8081 +const SagaBusiApi = "/api/busi_saga" + +var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi) + +func SagaMain() { + go SagaStartSvr() + sagaFireRequest() + time.Sleep(1000 * time.Second) +} + +func SagaStartSvr() { + logrus.Printf("saga examples starting") + app := common.GetGinApp() + AddRoute(app) + app.Run(":8081") +} + +func sagaFireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + saga := dtm.SagaNew(DtmServer, gid, SagaBusi+"/TransQuery") + + saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) + saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) + err := saga.Prepare() + common.PanicIfError(err) + logrus.Printf("busi trans commit") + err = saga.Commit() + common.PanicIfError(err) +} + +// api + +func AddRoute(app *gin.Engine) { + app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(TransIn)) + app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate)) + app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(TransOut)) + app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate)) + app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(TransQuery)) + logrus.Printf("examples listening at %d", SagaBusiPort) +} + +type M = map[string]interface{} + +var TransInResult = "" +var TransOutResult = "" +var TransInCompensateResult = "" +var TransOutCompensateResult = "" +var TransQueryResult = "" + +type TransReq struct { + Amount int `json:"amount"` + TransInResult string `json:"transInResult"` + TransOutResult string `json:"transOutResult"` +} + +func transReqFromContext(c *gin.Context) *TransReq { + req := TransReq{} + err := c.BindJSON(&req) + common.PanicIfError(err) + return &req +} + +func TransIn(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TransInResult, req.TransInResult, "SUCCESS") + logrus.Printf("%s TransIn: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func TransInCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TransInCompensateResult, "SUCCESS") + logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func TransOut(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TransOutResult, req.TransOutResult, "SUCCESS") + logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func TransOutCompensate(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(TransOutCompensateResult, "SUCCESS") + logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func TransQuery(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + logrus.Printf("%s TransQuery", gid) + res := common.OrString(TransQueryResult, "SUCCESS") + return M{"result": res}, nil +} diff --git a/examples/xa_main.go b/examples/xa_main.go new file mode 100644 index 0000000..b223e51 --- /dev/null +++ b/examples/xa_main.go @@ -0,0 +1,34 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const XaBusiPort = 8082 +const XaBusiApi = "/api/busi_xa" + +var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi) + +func XaMain() { + go XaStartSvr() + xaFireRequest() + time.Sleep(1000 * time.Second) +} + +func XaStartSvr() { + logrus.Printf("xa examples starting") + app := common.GetGinApp() + AddRoute(app) + app.Run(":8081") +} + +func xaFireRequest() { + +} + +// api