From 92119b7361711e846b45670a650ee26497938b38 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Sat, 3 Jul 2021 12:07:43 +0800 Subject: [PATCH] change to dtmcli --- dtmcli/barrier.go | 69 +++++++++++++++++++++++++++++++ dtmcli/message.go | 70 +++++++++++++++++++++++++++++++ dtmcli/saga.go | 57 +++++++++++++++++++++++++ dtmcli/tcc.go | 59 ++++++++++++++++++++++++++ dtmcli/xa.go | 103 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 358 insertions(+) create mode 100644 dtmcli/barrier.go create mode 100644 dtmcli/message.go create mode 100644 dtmcli/saga.go create mode 100644 dtmcli/tcc.go create mode 100644 dtmcli/xa.go diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go new file mode 100644 index 0000000..42d204a --- /dev/null +++ b/dtmcli/barrier.go @@ -0,0 +1,69 @@ +package dtmcli + +import ( + "context" + "database/sql" + "fmt" + + "github.com/yedf/dtm/common" +) + +type BusiFunc func(db *sql.DB) (interface{}, error) + +type TransInfo struct { + TransType string + Gid string + BranchID string + BranchType string +} + +func (t *TransInfo) String() string { + return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) +} + +type BarrierModel struct { + common.ModelBase + TransInfo +} + +func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } + +func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { + if branchType == "" { + return 0, nil + } + res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func ThroughBarrierCall(db *sql.DB, transType string, gid string, branchId string, branchType string, busiCall BusiFunc) (res interface{}, rerr error) { + tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) + if rerr != nil { + return + } + defer func() { + if x := recover(); x != nil { + tx.Rollback() + panic(x) + } else if rerr != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + + originType := map[string]string{ + "cancel": "action", + "compensate": "action", + }[branchType] + originAffected, _ := insertBarrier(tx, transType, gid, branchId, originType) + currentAffected, rerr := insertBarrier(tx, transType, gid, branchId, branchType) + if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 { + return + } + res, rerr = busiCall(db) + return +} diff --git a/dtmcli/message.go b/dtmcli/message.go new file mode 100644 index 0000000..c9f48ca --- /dev/null +++ b/dtmcli/message.go @@ -0,0 +1,70 @@ +package dtmcli + +import ( + "fmt" + + jsonitor "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +type Msg struct { + MsgData + Server string +} + +type MsgData struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []MsgStep `json:"steps"` + QueryPrepared string `json:"query_prepared"` +} +type MsgStep struct { + Action string `json:"action"` + Data string `json:"data"` +} + +func MsgNew(server string) *Msg { + return &Msg{ + MsgData: MsgData{ + TransType: "msg", + }, + Server: server, + } +} +func (s *Msg) Add(action string, postData interface{}) *Msg { + logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) + step := MsgStep{ + Action: action, + Data: common.MustMarshalString(postData), + } + s.Steps = append(s.Steps, step) + return s +} + +func (s *Msg) Submit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) + resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("submit failed: %v", resp.Body()) + } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() + return nil +} + +func (s *Msg) Prepare(queryPrepared string) error { + s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) + logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) + resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("prepare failed: %v", resp.Body()) + } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() + return nil +} diff --git a/dtmcli/saga.go b/dtmcli/saga.go new file mode 100644 index 0000000..d4a34cf --- /dev/null +++ b/dtmcli/saga.go @@ -0,0 +1,57 @@ +package dtmcli + +import ( + "fmt" + + jsonitor "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +type Saga struct { + SagaData + Server string +} + +type SagaData struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []SagaStep `json:"steps"` +} +type SagaStep struct { + Action string `json:"action"` + Compensate string `json:"compensate"` + Data string `json:"data"` +} + +func SagaNew(server string) *Saga { + return &Saga{ + SagaData: SagaData{ + TransType: "saga", + }, + Server: server, + } +} +func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { + logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) + step := SagaStep{ + Action: action, + Compensate: compensate, + Data: common.MustMarshalString(postData), + } + s.Steps = append(s.Steps, step) + return s +} + +func (s *Saga) Submit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) + resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("submit failed: %v", resp.Body()) + } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() + return nil +} diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go new file mode 100644 index 0000000..5a9c671 --- /dev/null +++ b/dtmcli/tcc.go @@ -0,0 +1,59 @@ +package dtmcli + +import ( + "fmt" + + jsonitor "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +type Tcc struct { + TccData + Server string +} + +type TccData struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []TccStep `json:"steps"` +} +type TccStep struct { + Try string `json:"try"` + Confirm string `json:"confirm"` + Cancel string `json:"cancel"` + Data string `json:"data"` +} + +func TccNew(server string) *Tcc { + return &Tcc{ + TccData: TccData{ + TransType: "tcc", + }, + Server: server, + } +} +func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc { + logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) + step := TccStep{ + Try: try, + Confirm: confirm, + Cancel: cancel, + Data: common.MustMarshalString(data), + } + s.Steps = append(s.Steps, step) + return s +} + +func (s *Tcc) Submit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) + resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("submit failed: %v", resp.Body()) + } + s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() + return nil +} diff --git a/dtmcli/xa.go b/dtmcli/xa.go new file mode 100644 index 0000000..be2dcf4 --- /dev/null +++ b/dtmcli/xa.go @@ -0,0 +1,103 @@ +package dtmcli + +import ( + "fmt" + "net/url" + "strings" + + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/common" +) + +type M = map[string]interface{} + +var e2p = common.E2P + +type XaGlobalFunc func() error + +type XaLocalFunc func(db *common.DB) error + +type XaClient struct { + Server string + Conf map[string]string + CallbackUrl string +} + +func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient { + xa := &XaClient{ + Server: server, + Conf: mysqlConf, + CallbackUrl: callbackUrl, + } + u, err := url.Parse(callbackUrl) + e2p(err) + app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { + type CallbackReq struct { + Gid string `json:"gid"` + Branch string `json:"branch"` + Action string `json:"action"` + } + req := CallbackReq{} + b, err := c.GetRawData() + e2p(err) + common.MustUnmarshal(b, &req) + tx, my := common.DbAlone(xa.Conf) + defer my.Close() + if req.Action == "commit" { + tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch)) + } else if req.Action == "rollback" { + tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.Branch)) + } else { + panic(fmt.Errorf("unknown action: %s", req.Action)) + } + return M{"result": "SUCCESS"}, nil + })) + return xa +} + +func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { + defer common.P2E(&rerr) + branch := common.GenGid() + tx, my := common.DbAlone(xa.Conf) + defer func() { my.Close() }() + tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch)) + err := transFunc(tx) + e2p(err) + resp, err := common.RestyClient.R(). + SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). + Post(xa.Server + "/branch") + e2p(err) + if !strings.Contains(resp.String(), "SUCCESS") { + e2p(fmt.Errorf("unknown server response: %s", resp.String())) + } + tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch)) + tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch)) + return nil +} + +func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rerr error) { + data := &M{ + "gid": gid, + "trans_type": "xa", + } + defer func() { + x := recover() + if x != nil { + _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort") + rerr = x.(error) + } + }() + resp, err := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare") + e2p(err) + if !strings.Contains(resp.String(), "SUCCESS") { + panic(fmt.Errorf("unexpected result: %s", resp.String())) + } + err = transFunc() + e2p(err) + resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit") + e2p(err) + if !strings.Contains(resp.String(), "SUCCESS") { + panic(fmt.Errorf("unexpected result: %s", resp.String())) + } + return nil +}