From 8a818ab46b16ff72762774a50e2dc81a33de4090 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Sat, 3 Jul 2021 12:07:24 +0800 Subject: [PATCH] change to dtmcli --- barrier.go | 69 ----------------------- dtmsvr/dtmsvr_test.go | 24 ++++---- examples/main_msg.go | 4 +- examples/main_saga.go | 4 +- examples/main_saga_barrier.go | 6 +- examples/main_tcc.go | 4 +- examples/main_xa.go | 6 +- examples/quick_start.go | 4 +- examples/types.go | 6 +- message.go | 70 ----------------------- saga.go | 57 ------------------- tcc.go | 59 ------------------- xa.go | 103 ---------------------------------- 13 files changed, 29 insertions(+), 387 deletions(-) delete mode 100644 barrier.go delete mode 100644 message.go delete mode 100644 saga.go delete mode 100644 tcc.go delete mode 100644 xa.go diff --git a/barrier.go b/barrier.go deleted file mode 100644 index 6c45061..0000000 --- a/barrier.go +++ /dev/null @@ -1,69 +0,0 @@ -package dtm - -import ( - "context" - "database/sql" - "fmt" - - "github.com/yedf/dtm/common" -) - -type BusiFunc func(db *sql.DB) (interface{}, error) - -type TransInfo struct { - TransType string - Gid string - BranchID string - BranchType string -} - -func (t *TransInfo) String() string { - return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) -} - -type BarrierModel struct { - common.ModelBase - TransInfo -} - -func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } - -func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { - if branchType == "" { - return 0, nil - } - res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType) - if err != nil { - return 0, err - } - return res.RowsAffected() -} - -func ThroughBarrierCall(db *sql.DB, transType string, gid string, branchId string, branchType string, busiCall BusiFunc) (res interface{}, rerr error) { - tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) - if rerr != nil { - return - } - defer func() { - if x := recover(); x != nil { - tx.Rollback() - panic(x) - } else if rerr != nil { - tx.Rollback() - } else { - tx.Commit() - } - }() - - originType := map[string]string{ - "cancel": "action", - "compensate": "action", - }[branchType] - originAffected, _ := insertBarrier(tx, transType, gid, branchId, originType) - currentAffected, rerr := insertBarrier(tx, transType, gid, branchId, branchType) - if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 { - return - } - res, rerr = busiCall(db) - return -} diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index a4b9399..9999374 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -9,8 +9,8 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/stretchr/testify/assert" - "github.com/yedf/dtm" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -207,9 +207,9 @@ func sagaCommittedPending(t *testing.T) { assert.Equal(t, "succeed", getTransStatus(saga.Gid)) } -func genMsg(gid string) *dtm.Msg { +func genMsg(gid string) *dtmcli.Msg { logrus.Printf("beginning a msg test ---------------- %s", gid) - msg := dtm.MsgNew(examples.DtmServer) + msg := dtmcli.MsgNew(examples.DtmServer) msg.QueryPrepared = examples.Busi + "/CanSubmit" req := examples.GenTransReq(30, false, false) msg.Add(examples.Busi+"/TransOut", &req) @@ -218,9 +218,9 @@ func genMsg(gid string) *dtm.Msg { return msg } -func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { +func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtm.SagaNew(examples.DtmServer) + saga := dtmcli.SagaNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) @@ -228,9 +228,9 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { return saga } -func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { +func genTcc(gid string, outFailed bool, inFailed bool) *dtmcli.Tcc { logrus.Printf("beginning a tcc test ---------------- %s", gid) - tcc := dtm.TccNew(examples.DtmServer) + tcc := dtmcli.TccNew(examples.DtmServer) req := examples.GenTransReq(30, outFailed, inFailed) tcc.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutConfirm", examples.Busi+"/TransOutRevert", &req) tcc.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInConfirm", examples.Busi+"/TransInRevert", &req) @@ -263,20 +263,20 @@ func TestSqlDB(t *testing.T) { asserts := assert.New(t) db := common.DbGet(config.Mysql) db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values('saga', 'gid1', 'branch_id1', 'action')") - _, err := dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { + _, err := dtmcli.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { logrus.Printf("rollback gid2") return nil, fmt.Errorf("gid2 error") }) asserts.Error(err, fmt.Errorf("gid2 error")) - dbr := db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid1").Find(&[]dtm.BarrierModel{}) + dbr := db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid1").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(1)) - dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) + dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(0)) - _, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { + _, err = dtmcli.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) { logrus.Printf("submit gid2") return nil, nil }) asserts.Nil(err) - dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{}) + dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) asserts.Equal(dbr.RowsAffected, int64(2)) } diff --git a/examples/main_msg.go b/examples/main_msg.go index 4795647..ce227be 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -5,7 +5,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm" + "github.com/yedf/dtm/dtmcli" ) func MsgMain() { @@ -26,7 +26,7 @@ func MsgFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - msg := dtm.MsgNew(DtmServer). + msg := dtmcli.MsgNew(DtmServer). Add(Busi+"/TransOut", req). Add(Busi+"/TransIn", req) err := msg.Prepare(Busi + "/TransQuery") diff --git a/examples/main_saga.go b/examples/main_saga.go index 12ea8b6..7649b01 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -5,7 +5,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm" + "github.com/yedf/dtm/dtmcli" ) func SagaMain() { @@ -28,7 +28,7 @@ func SagaFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtm.SagaNew(DtmServer). + saga := dtmcli.SagaNew(DtmServer). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) logrus.Printf("saga busi trans submit") diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index 80f353a..43c1a34 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -7,8 +7,8 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "gorm.io/gorm" ) @@ -37,7 +37,7 @@ func SagaBarrierFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - saga := dtm.SagaNew(DtmServer). + saga := dtmcli.SagaNew(DtmServer). Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req). Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req) logrus.Printf("busi trans submit") @@ -80,7 +80,7 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { gid := c.Query("gid") lid := c.Query("lid") req := reqFrom(c) - return dtm.ThroughBarrierCall(dbGet().ToSqlDB(), "saga", gid, lid, "action", func(sdb *sql.DB) (interface{}, error) { + return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), "saga", gid, lid, "action", func(sdb *sql.DB) (interface{}, error) { db := common.SqlDB2DB(sdb) dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). Update("balance", gorm.Expr("balance - ?", req.Amount)) diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 32ff9d9..6aac55a 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -5,7 +5,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm" + "github.com/yedf/dtm/dtmcli" ) func TccMain() { @@ -28,7 +28,7 @@ func TccFireRequest() { TransInResult: "SUCCESS", TransOutResult: "SUCCESS", } - tcc := dtm.TccNew(DtmServer). + tcc := dtmcli.TccNew(DtmServer). Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req) logrus.Printf("tcc trans submit") diff --git a/examples/main_xa.go b/examples/main_xa.go index 608b87a..cdf9bb7 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -5,12 +5,12 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/yedf/dtm" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "gorm.io/gorm" ) -var XaClient *dtm.XaClient = nil +var XaClient *dtmcli.XaClient = nil type UserAccount struct { common.ModelBase @@ -58,7 +58,7 @@ func XaSetup(app *gin.Engine) { app.POST(BusiApi+"/TransInXa", common.WrapHandler(xaTransIn)) app.POST(BusiApi+"/TransOutXa", common.WrapHandler(xaTransOut)) Config.Mysql["database"] = "dtm_busi" - XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, Busi+"/xa") + XaClient = dtmcli.XaClientNew(DtmServer, Config.Mysql, app, Busi+"/xa") } func xaTransIn(c *gin.Context) (interface{}, error) { diff --git a/examples/quick_start.go b/examples/quick_start.go index 5c39a4c..73b4b82 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -6,8 +6,8 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/yedf/dtm" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) // 事务参与者的服务地址 @@ -31,7 +31,7 @@ func qsStartSvr() { func qsFireRequest() { req := &gin.H{"amount": 30} - saga := dtm.SagaNew(DtmServer). + saga := dtmcli.SagaNew(DtmServer). Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req) err := saga.Submit() diff --git a/examples/types.go b/examples/types.go index 4553089..d66b64b 100644 --- a/examples/types.go +++ b/examples/types.go @@ -4,8 +4,8 @@ import ( "fmt" "github.com/gin-gonic/gin" - "github.com/yedf/dtm" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" ) var e2p = common.E2P @@ -44,8 +44,8 @@ func reqFrom(c *gin.Context) *TransReq { return &req } -func infoFromContext(c *gin.Context) *dtm.TransInfo { - info := dtm.TransInfo{ +func infoFromContext(c *gin.Context) *dtmcli.TransInfo { + info := dtmcli.TransInfo{ TransType: c.Query("trans_type"), Gid: c.Query("gid"), BranchID: c.Query("branch_id"), diff --git a/message.go b/message.go deleted file mode 100644 index 63f14bd..0000000 --- a/message.go +++ /dev/null @@ -1,70 +0,0 @@ -package dtm - -import ( - "fmt" - - jsonitor "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" -) - -type Msg struct { - MsgData - Server string -} - -type MsgData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []MsgStep `json:"steps"` - QueryPrepared string `json:"query_prepared"` -} -type MsgStep struct { - Action string `json:"action"` - Data string `json:"data"` -} - -func MsgNew(server string) *Msg { - return &Msg{ - MsgData: MsgData{ - TransType: "msg", - }, - Server: server, - } -} -func (s *Msg) Add(action string, postData interface{}) *Msg { - logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) - step := MsgStep{ - Action: action, - Data: common.MustMarshalString(postData), - } - s.Steps = append(s.Steps, step) - return s -} - -func (s *Msg) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("submit failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil -} - -func (s *Msg) Prepare(queryPrepared string) error { - s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) - resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("prepare failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil -} diff --git a/saga.go b/saga.go deleted file mode 100644 index 02c90b9..0000000 --- a/saga.go +++ /dev/null @@ -1,57 +0,0 @@ -package dtm - -import ( - "fmt" - - jsonitor "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" -) - -type Saga struct { - SagaData - Server string -} - -type SagaData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []SagaStep `json:"steps"` -} -type SagaStep struct { - Action string `json:"action"` - Compensate string `json:"compensate"` - Data string `json:"data"` -} - -func SagaNew(server string) *Saga { - return &Saga{ - SagaData: SagaData{ - TransType: "saga", - }, - Server: server, - } -} -func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { - logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) - step := SagaStep{ - Action: action, - Compensate: compensate, - Data: common.MustMarshalString(postData), - } - s.Steps = append(s.Steps, step) - return s -} - -func (s *Saga) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("submit failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil -} diff --git a/tcc.go b/tcc.go deleted file mode 100644 index b872ef7..0000000 --- a/tcc.go +++ /dev/null @@ -1,59 +0,0 @@ -package dtm - -import ( - "fmt" - - jsonitor "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" -) - -type Tcc struct { - TccData - Server string -} - -type TccData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []TccStep `json:"steps"` -} -type TccStep struct { - Try string `json:"try"` - Confirm string `json:"confirm"` - Cancel string `json:"cancel"` - Data string `json:"data"` -} - -func TccNew(server string) *Tcc { - return &Tcc{ - TccData: TccData{ - TransType: "tcc", - }, - Server: server, - } -} -func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc { - logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) - step := TccStep{ - Try: try, - Confirm: confirm, - Cancel: cancel, - Data: common.MustMarshalString(data), - } - s.Steps = append(s.Steps, step) - return s -} - -func (s *Tcc) Submit() error { - logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) - resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("submit failed: %v", resp.Body()) - } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() - return nil -} diff --git a/xa.go b/xa.go deleted file mode 100644 index 9b29168..0000000 --- a/xa.go +++ /dev/null @@ -1,103 +0,0 @@ -package dtm - -import ( - "fmt" - "net/url" - "strings" - - "github.com/gin-gonic/gin" - "github.com/yedf/dtm/common" -) - -type M = map[string]interface{} - -var e2p = common.E2P - -type XaGlobalFunc func() error - -type XaLocalFunc func(db *common.DB) error - -type XaClient struct { - Server string - Conf map[string]string - CallbackUrl string -} - -func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient { - xa := &XaClient{ - Server: server, - Conf: mysqlConf, - CallbackUrl: callbackUrl, - } - u, err := url.Parse(callbackUrl) - e2p(err) - app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { - type CallbackReq struct { - Gid string `json:"gid"` - Branch string `json:"branch"` - Action string `json:"action"` - } - req := CallbackReq{} - b, err := c.GetRawData() - e2p(err) - common.MustUnmarshal(b, &req) - tx, my := common.DbAlone(xa.Conf) - defer my.Close() - if req.Action == "commit" { - tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch)) - } else if req.Action == "rollback" { - tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.Branch)) - } else { - panic(fmt.Errorf("unknown action: %s", req.Action)) - } - return M{"result": "SUCCESS"}, nil - })) - return xa -} - -func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { - defer common.P2E(&rerr) - branch := common.GenGid() - tx, my := common.DbAlone(xa.Conf) - defer func() { my.Close() }() - tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch)) - err := transFunc(tx) - e2p(err) - resp, err := common.RestyClient.R(). - SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). - Post(xa.Server + "/branch") - e2p(err) - if !strings.Contains(resp.String(), "SUCCESS") { - e2p(fmt.Errorf("unknown server response: %s", resp.String())) - } - tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch)) - tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch)) - return nil -} - -func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rerr error) { - data := &M{ - "gid": gid, - "trans_type": "xa", - } - defer func() { - x := recover() - if x != nil { - _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort") - rerr = x.(error) - } - }() - resp, err := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare") - e2p(err) - if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) - } - err = transFunc() - e2p(err) - resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit") - e2p(err) - if !strings.Contains(resp.String(), "SUCCESS") { - panic(fmt.Errorf("unexpected result: %s", resp.String())) - } - return nil -}