From 89879aba1a376b04b770d29427d1c237ca6976aa Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 1 Jul 2021 16:45:39 +0800 Subject: [PATCH] gen gid only in dtm --- README.md | 3 +-- dtmsvr/api.go | 4 ++-- dtmsvr/dtmsvr_test.go | 9 ++++++--- dtmsvr/trans.go | 3 +++ examples/main_msg.go | 5 ++--- examples/main_saga.go | 6 +++--- examples/main_tcc.go | 5 ++--- examples/quick_start.go | 3 +-- go.mod | 2 +- message.go | 6 ++++-- saga.go | 5 +++-- tcc.go | 5 +++-- 12 files changed, 31 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index a608a8e..4e8de13 100644 --- a/README.md +++ b/README.md @@ -37,10 +37,9 @@ DTM 是一款跨语言的分布式事务管理方案,在各类微服务架构 ``` go const DtmServer = "http://localhost:8080/api/dtmsvr" const startBusi = "http://localhost:8081/api/busi_saga" -gid := common.GenGid() // 生成事务id req := &gin.H{"amount": 30} // 微服务的负荷 // 生成dtm的saga对象 -saga := dtm.SagaNew(DtmServer, gid). +saga := dtm.SagaNew(DtmServer). // 添加两个子事务 Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 5a9d030..8124713 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -22,7 +22,7 @@ func Prepare(c *gin.Context) (interface{}, error) { m := TransFromContext(c) m.Status = "prepared" m.SaveNew(dbGet()) - return M{"message": "SUCCESS"}, nil + return M{"message": "SUCCESS", "gid": m.Gid}, nil } func Commit(c *gin.Context) (interface{}, error) { @@ -31,7 +31,7 @@ func Commit(c *gin.Context) (interface{}, error) { m.Status = "committed" m.SaveNew(db) go m.Process(db) - return M{"message": "SUCCESS"}, nil + return M{"message": "SUCCESS", "gid": m.Gid}, nil } func Rollback(c *gin.Context) (interface{}, error) { diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 38686fe..f207a05 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -209,29 +209,32 @@ func sagaCommittedPending(t *testing.T) { func genMsg(gid string) *dtm.Msg { logrus.Printf("beginning a msg test ---------------- %s", gid) - msg := dtm.MsgNew(examples.DtmServer, gid) + msg := dtm.MsgNew(examples.DtmServer) msg.QueryPrepared = examples.MsgBusi + "/TransQuery" req := examples.GenTransReq(30, false, false) msg.Add(examples.MsgBusi+"/TransOut", &req) msg.Add(examples.MsgBusi+"/TransIn", &req) + msg.Gid = gid return msg } func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtm.SagaNew(examples.DtmServer, gid) + saga := dtm.SagaNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) + saga.Gid = gid return saga } func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { logrus.Printf("beginning a tcc test ---------------- %s", gid) - tcc := dtm.TccNew(examples.DtmServer, gid) + tcc := dtm.TccNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req) tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) + tcc.Gid = gid return tcc } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 47e630e..9b3c2bf 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -125,6 +125,9 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string { } func (t *TransGlobal) SaveNew(db *common.DB) { + if t.Gid == "" { + t.Gid = common.GenGid() + } err := db.Transaction(func(db1 *gorm.DB) error { db := &common.DB{DB: db1} updates := t.setNextCron(config.TransCronInterval) diff --git a/examples/main_msg.go b/examples/main_msg.go index 9b12819..86cc8e7 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -29,14 +29,13 @@ func MsgStartSvr() { } func MsgFireRequest() { - gid := common.GenGid() - logrus.Printf("busi transaction begin: %s", gid) + logrus.Printf("a busi transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - msg := dtm.MsgNew(DtmServer, gid). + msg := dtm.MsgNew(DtmServer). Add(MsgBusi+"/TransOut", req). Add(MsgBusi+"/TransIn", req) err := msg.Prepare(MsgBusi + "/TransQuery") diff --git a/examples/main_saga.go b/examples/main_saga.go index 4db843c..9de536c 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -29,18 +29,18 @@ func SagaStartSvr() { } func SagaFireRequest() { - gid := common.GenGid() - logrus.Printf("busi transaction begin: %s", gid) + logrus.Printf("a busi transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtm.SagaNew(DtmServer, gid). + saga := dtm.SagaNew(DtmServer). Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req). Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) logrus.Printf("busi trans commit") err := saga.Commit() + logrus.Printf("result gid is: %s", saga.Gid) e2p(err) } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 736f8bf..119f3b9 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -29,14 +29,13 @@ func TccStartSvr() { } func TccFireRequest() { - gid := common.GenGid() - logrus.Printf("busi transaction begin: %s", gid) + logrus.Printf("a busi transaction begin") req := &TransReq{ Amount: 30, TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - tcc := dtm.TccNew(DtmServer, gid). + tcc := dtm.TccNew(DtmServer). Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req). Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) logrus.Printf("busi trans commit") diff --git a/examples/quick_start.go b/examples/quick_start.go index eb0ced3..59d2ef3 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -29,9 +29,8 @@ func startStartSvr() { } func startFireRequest() { - gid := common.GenGid() req := &gin.H{"amount": 30} - saga := dtm.SagaNew(DtmServer, gid). + saga := dtm.SagaNew(DtmServer). Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req). Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req) err := saga.Commit() diff --git a/go.mod b/go.mod index 9d1edf6..bd0673c 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/go-resty/resty/v2 v2.6.0 github.com/golang/protobuf v1.4.2 // indirect github.com/google/go-cmp v0.5.5 // indirect - github.com/json-iterator/go v1.1.10 // indirect + github.com/json-iterator/go v1.1.10 github.com/sirupsen/logrus v1.7.0 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.7.0 // indirect diff --git a/message.go b/message.go index 4820409..55f1d93 100644 --- a/message.go +++ b/message.go @@ -3,6 +3,7 @@ package dtm import ( "fmt" + jsonitor "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -23,10 +24,9 @@ type MsgStep struct { Data string `json:"data"` } -func MsgNew(server string, gid string) *Msg { +func MsgNew(server string) *Msg { return &Msg{ MsgData: MsgData{ - Gid: gid, TransType: "msg", }, Server: server, @@ -51,6 +51,7 @@ func (s *Msg) Commit() error { if resp.StatusCode() != 200 { return fmt.Errorf("commit failed: %v", resp.Body()) } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil } @@ -64,5 +65,6 @@ func (s *Msg) Prepare(queryPrepared string) error { if resp.StatusCode() != 200 { return fmt.Errorf("prepare failed: %v", resp.Body()) } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil } diff --git a/saga.go b/saga.go index 3fd687a..4b6d738 100644 --- a/saga.go +++ b/saga.go @@ -3,6 +3,7 @@ package dtm import ( "fmt" + jsonitor "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -23,10 +24,9 @@ type SagaStep struct { Data string `json:"data"` } -func SagaNew(server string, gid string) *Saga { +func SagaNew(server string) *Saga { return &Saga{ SagaData: SagaData{ - Gid: gid, TransType: "saga", }, Server: server, @@ -52,5 +52,6 @@ func (s *Saga) Commit() error { if resp.StatusCode() != 200 { return fmt.Errorf("commit failed: %v", resp.Body()) } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil } diff --git a/tcc.go b/tcc.go index df857a0..bf1e942 100644 --- a/tcc.go +++ b/tcc.go @@ -3,6 +3,7 @@ package dtm import ( "fmt" + jsonitor "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -24,10 +25,9 @@ type TccStep struct { Data string `json:"data"` } -func TccNew(server string, gid string) *Tcc { +func TccNew(server string) *Tcc { return &Tcc{ TccData: TccData{ - Gid: gid, TransType: "tcc", }, Server: server, @@ -54,5 +54,6 @@ func (s *Tcc) Commit() error { if resp.StatusCode() != 200 { return fmt.Errorf("commit failed: %v", resp.Body()) } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil }