From 75ba3e30cca353f78690cb79a4424bfe94e77207 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Wed, 2 Jun 2021 17:22:11 +0800 Subject: [PATCH] add query --- README-cn.md | 45 ++++++++++++++++-------- common/utils.go | 2 +- dtmsvr/api.go | 19 +++++++++++ dtmsvr/dtmsvr_test.go | 28 ++++++++++++++- examples/main_saga.go | 7 ++-- examples/main_tcc.go | 7 ++-- examples/quick_start.go | 76 +++++++++++++++++++++++++++++++++++++++++ saga.go | 33 ++++++++---------- tcc.go | 33 ++++++++---------- xa.go | 1 + 10 files changed, 189 insertions(+), 62 deletions(-) create mode 100644 examples/quick_start.go diff --git a/README-cn.md b/README-cn.md index 571df85..f3630fc 100644 --- a/README-cn.md +++ b/README-cn.md @@ -1,17 +1,34 @@ ### 轻量级分布式事务管理服务 + 跨语言--语言无关,基于http协议 + 支持xa、tcc、saga +## 快速开始 + 场景描述: + 假设您实现了一个转账功能,分为两个微服务:转入、转出 + 转出:服务地址为 http://example.com/api/busi_saga/transOut?gid=xxx POST 参数为 {"uid": 2, "amount":30} + 转入:服务地址为 http://example.com/api/busi_saga/transIn?gid=xxx POST 参数为 {"uid": 1, "amount":30} + 在saga模式下,有对应的补偿微服务 + 转出:服务地址为 http://example.com/api/busi_saga/transOutCompensate?gid=xxx POST 参数为 {"uid": 2, "amount":30} + 转入:服务地址为 http://example.com/api/busi_saga/transInCompensate?gid=xxx POST 参数为 {"uid": 1, "amount":30} + HTTP协议方式 + curl -d '{"gid":"xxx","trans_type":"saga","steps":[{"action":"http://example.com/api/busi_saga/TransOut","compensate":"http://example.com/api/busi_saga/TransOutCompensate","data":"{\"amount\":30}"},{"action":"http://localhost:8081/api/busi_saga/TransIn","compensate":"http://localhost:8081/api/busi_saga/TransInCompensate","data":"{\"amount\":30}"}]}' 8.140.124.252/api/dtm/commit + 此请求向dtm提交了一个saga事务,dtm会按照saga模式,请求transIn/transOut,并且在出错情况下,保证抵用相关的补偿api + go客户端方式 + // 事务参与者的服务地址 + const startBusiPort = 8084 + const startBusiApi = "/api/busi_start" -## 配置rabbitmq和mysql + var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi) + err := dtm.SagaNew(DtmServer, gid).Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", &gin.H{ + "amount": 30, + "uid": 2, + }).Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", &gin.H{ + "amount": 30, + "uid": 1 + }).Commit() + + 本地启动方式 + 需要安装docker,和docker-compose + curl localhost:8080/api/initMysql + go run examples/app/main saga -dtm依赖于rabbitmq和mysql,请搭建好rabbitmq和mysql,并修改dtm.yml - -## 启动tc - -```go run dtm-svr/svr``` - -## 启动例子saga的tm+rm - -```go run example/saga``` - -## 或者启动例子tcc的tm+rm - -```go run example/tcc``` \ No newline at end of file + 其他 diff --git a/common/utils.go b/common/utils.go index 90c6189..6eab953 100644 --- a/common/utils.go +++ b/common/utils.go @@ -152,7 +152,7 @@ func init() { // RestyClient.SetRetryCount(2) // RestyClient.SetRetryWaitTime(1 * time.Second) RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { - logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) + logrus.Printf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam) return nil }) RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 14dea34..5a9d030 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -1,10 +1,12 @@ package dtmsvr import ( + "errors" "fmt" "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -13,6 +15,7 @@ func AddRoute(engine *gin.Engine) { engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit)) engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch)) engine.POST("/api/dtmsvr/rollback", common.WrapHandler(Rollback)) + engine.GET("/api/dtmsvr/query", common.WrapHandler(Query)) } func Prepare(c *gin.Context) (interface{}, error) { @@ -57,3 +60,19 @@ func Branch(c *gin.Context) (interface{}, error) { e2p(err) return M{"message": "SUCCESS"}, nil } + +func Query(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + if gid == "" { + return nil, errors.New("no gid specified") + } + trans := TransGlobal{} + db := dbGet() + dbr := db.Must().Where("gid", gid).First(&trans) + if dbr.Error == gorm.ErrRecordNotFound { + return M{"transaction": nil, "branches": [0]int{}}, nil + } + branches := []TransBranch{} + db.Must().Where("gid", gid).Find(&branches) + return M{"transaction": trans, "branches": branches}, nil +} diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index d71ecb1..7f57bea 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -38,6 +38,11 @@ func TestDtmSvr(t *testing.T) { e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() + sagaNormal(t) + + // 需要放到前面的用例之后,才有真实的数据 + transQuery(t) + tccNormal(t) tccRollback(t) tccRollbackPending(t) @@ -46,8 +51,8 @@ func TestDtmSvr(t *testing.T) { sagaCommittedPending(t) sagaPreparePending(t) sagaPrepareCancel(t) - sagaNormal(t) sagaRollback(t) + } func TestCover(t *testing.T) { @@ -225,3 +230,24 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) return tcc } + +func transQuery(t *testing.T) { + resp, err := common.RestyClient.R().SetQueryParam("gid", "gid-noramlSaga").Get(examples.DtmServer + "/query") + e2p(err) + m := M{} + assert.Equal(t, resp.StatusCode(), 200) + common.MustUnmarshalString(resp.String(), &m) + assert.NotEqual(t, nil, m["transaction"]) + assert.Equal(t, 4, len(m["branches"].([]interface{}))) + + resp, err = common.RestyClient.R().SetQueryParam("gid", "").Get(examples.DtmServer + "/query") + e2p(err) + assert.Equal(t, resp.StatusCode(), 500) + + resp, err = common.RestyClient.R().SetQueryParam("gid", "1").Get(examples.DtmServer + "/query") + e2p(err) + assert.Equal(t, resp.StatusCode(), 200) + common.MustUnmarshalString(resp.String(), &m) + assert.Equal(t, nil, m["transaction"]) + assert.Equal(t, 0, len(m["branches"].([]interface{}))) +} diff --git a/examples/main_saga.go b/examples/main_saga.go index 3558503..7a6fc79 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -37,10 +37,9 @@ func sagaFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtm.SagaNew(DtmServer, gid) - - saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) - saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) + saga := dtm.SagaNew(DtmServer, gid). + Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req). + Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) err := saga.Prepare(SagaBusi + "/TransQuery") e2p(err) logrus.Printf("busi trans commit") diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 035e8a5..a81dc39 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -37,10 +37,9 @@ func tccFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - tcc := dtm.TccNew(DtmServer, gid) - - tcc.Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req) - tcc.Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) + tcc := dtm.TccNew(DtmServer, gid). + Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req). + Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) err := tcc.Prepare(TccBusi + "/TransQuery") e2p(err) logrus.Printf("busi trans commit") diff --git a/examples/quick_start.go b/examples/quick_start.go new file mode 100644 index 0000000..888c3af --- /dev/null +++ b/examples/quick_start.go @@ -0,0 +1,76 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const startBusiPort = 8084 +const startBusiApi = "/api/busi_start" + +var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi) + +func startMain() { + go startStartSvr() + startFireRequest() + time.Sleep(1000 * time.Second) +} + +func startStartSvr() { + logrus.Printf("saga examples starting") + app := common.GetGinApp() + startAddRoute(app) + app.Run(fmt.Sprintf(":%d", SagaBusiPort)) +} + +func startFireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + saga := dtm.SagaNew(DtmServer, gid). + Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). + Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) + logrus.Printf("busi trans commit") + err := saga.Commit() + e2p(err) +} + +func startAddRoute(app *gin.Engine) { + app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(startTransIn)) + app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(startTransInCompensate)) + app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(startTransOut)) + app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(startTransOutCompensate)) + logrus.Printf("examples listening at %d", startBusiPort) +} + +func startTransIn(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + logrus.Printf("%s TransIn: %v result: %s", gid, req, req.TransInResult) + return M{"result": req.TransInResult}, nil +} + +func startTransInCompensate(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil +} + +func startTransOut(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + logrus.Printf("%s TransOut: %v result: %s", gid, req, req.TransOutResult) + return M{"result": req.TransOutResult}, nil +} + +func startTransOutCompensate(c *gin.Context) (interface{}, error) { + return M{"result": "SUCCESS"}, nil +} diff --git a/saga.go b/saga.go index a4f91df..5f6d8a9 100644 --- a/saga.go +++ b/saga.go @@ -1,7 +1,6 @@ package dtm import ( - "encoding/json" "fmt" "github.com/sirupsen/logrus" @@ -34,18 +33,26 @@ func SagaNew(server string, gid string) *Saga { Server: server, } } -func (s *Saga) Add(action string, compensate string, postData interface{}) error { +func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) - d, err := json.Marshal(postData) - if err != nil { - return err - } step := SagaStep{ Action: action, Compensate: compensate, - Data: string(d), + Data: common.MustMarshalString(postData), } s.Steps = append(s.Steps, step) + return s +} + +func (s *Saga) Commit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) + resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("commit failed: %v", resp.Body()) + } return nil } @@ -61,15 +68,3 @@ func (s *Saga) Prepare(queryPrepared string) error { } return nil } - -func (s *Saga) Commit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("commit failed: %v", resp.Body()) - } - return nil -} diff --git a/tcc.go b/tcc.go index 69dc68b..11290ed 100644 --- a/tcc.go +++ b/tcc.go @@ -1,7 +1,6 @@ package dtm import ( - "encoding/json" "fmt" "github.com/sirupsen/logrus" @@ -35,19 +34,27 @@ func TccNew(server string, gid string) *Tcc { Server: server, } } -func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) error { +func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc { logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) - d, err := json.Marshal(data) - if err != nil { - return err - } step := TccStep{ Try: try, Confirm: confirm, Cancel: cancel, - Data: string(d), + Data: common.MustMarshalString(data), } s.Steps = append(s.Steps, step) + return s +} + +func (s *Tcc) Commit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) + resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("commit failed: %v", resp.Body()) + } return nil } @@ -63,15 +70,3 @@ func (s *Tcc) Prepare(queryPrepared string) error { } return nil } - -func (s *Tcc) Commit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) - resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("commit failed: %v", resp.Body()) - } - return nil -} diff --git a/xa.go b/xa.go index db19f7d..4b2bbe8 100644 --- a/xa.go +++ b/xa.go @@ -56,6 +56,7 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca })) return xa } + func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { defer common.P2E(&rerr) branch := common.GenGid()