From 864ef5b4f7f4f6acde3324b1ec458e7ef326c993 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 16:59:03 +0800 Subject: [PATCH] CallDtm move to TransBase --- dtmcli/message.go | 17 +++++++--------- dtmcli/saga.go | 15 ++++++--------- dtmcli/tcc.go | 18 ++++++++--------- dtmcli/types.go | 33 +++++++++++++++++++------------- dtmcli/xa.go | 16 +++++++--------- dtmsvr/trans_tcc_barrier_test.go | 4 ++-- 6 files changed, 50 insertions(+), 53 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index 80a044d..02df337 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -8,7 +8,7 @@ import ( // Msg reliable msg type type Msg struct { MsgData - Server string + TransBase } // MsgData msg data @@ -32,13 +32,15 @@ func NewMsg(server string, gid string) *Msg { Gid: gid, TransType: "msg", }, - Server: server, + TransBase: TransBase{ + Dtm: server, + }, } } // Add add a new step func (s *Msg) Add(action string, postData interface{}) *Msg { - logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) + logrus.Printf("msg %s Add %s %v", s.MsgData.Gid, action, postData) step := MsgStep{ Action: action, Data: common.MustMarshalString(postData), @@ -50,15 +52,10 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - return callDtmSimple(s.Server, &s.MsgData, "prepare") + return s.CallDtm(&s.MsgData, "prepare") } // Submit submit the msg func (s *Msg) Submit() error { - return callDtmSimple(s.Server, &s.MsgData, "submit") -} - -// SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Msg) SubmitExt(opt *TransOptions) error { - return CallDtm(s.Server, &s.MsgData, "submit", opt) + return s.CallDtm(&s.MsgData, "submit") } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index a7f39ee..b32a340 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -8,7 +8,7 @@ import ( // Saga struct of saga type Saga struct { SagaData - Server string + TransBase } // SagaData sage data @@ -32,13 +32,15 @@ func NewSaga(server string, gid string) *Saga { Gid: gid, TransType: "saga", }, - Server: server, + TransBase: TransBase{ + Dtm: server, + }, } } // Add add a saga step func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { - logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) + logrus.Printf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData) step := SagaStep{ Action: action, Compensate: compensate, @@ -50,10 +52,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - return s.SubmitExt(&TransOptions{}) -} - -// SubmitExt 高级submit,更多的选项和更详细的返回值 -func (s *Saga) SubmitExt(opt *TransOptions) error { - return CallDtm(s.Server, &s.SagaData, "submit", opt) + return s.CallDtm(&s.SagaData, "submit") } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index c2f019d..9ca9cfb 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -10,9 +10,8 @@ import ( // Tcc struct of tcc type Tcc struct { - IDGenerator - Dtm string Gid string + TransBase } // TccGlobalFunc type of global tcc call @@ -27,8 +26,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e "gid": gid, "trans_type": "tcc", } - tcc := &Tcc{Dtm: dtm, Gid: gid} - rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) + tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, Gid: gid} + rerr = tcc.CallDtm(data, "prepare") if rerr != nil { return rerr } @@ -37,7 +36,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e x := recover() var err error operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) - err = CallDtm(dtm, data, operation, &TransOptions{}) + err = tcc.CallDtm(data, operation) if rerr == nil { rerr = err } @@ -53,9 +52,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e // TccFromReq tcc from request info func TccFromReq(c *gin.Context) (*Tcc, error) { tcc := &Tcc{ - Dtm: c.Query("dtm"), - Gid: c.Query("gid"), - IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, + TransBase: *TransBaseFromReq(c), + Gid: c.Query("gid"), } if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid) @@ -67,7 +65,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - err := CallDtm(t.Dtm, &M{ + err := t.CallDtm(&M{ "gid": t.Gid, "branch_id": branchID, "trans_type": "tcc", @@ -76,7 +74,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, - }, "registerTccBranch", &TransOptions{}) + }, "registerTccBranch") if err != nil { return nil, err } diff --git a/dtmcli/types.go b/dtmcli/types.go index 92e97fc..86a148f 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/gin-gonic/gin" "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" ) @@ -61,23 +62,33 @@ func (g *IDGenerator) NewBranchID() string { return g.parentID + fmt.Sprintf("%02d", g.branchID) } -// TransOptions 提交/终止事务的选项 -type TransOptions struct { - // WaitResult 是否等待全局事务的最终结果 - WaitResult bool -} - // TransResult dtm 返回的结果 type TransResult struct { DtmResult string `json:"dtm_result"` Message string } +// TransBase 事务的基础类 +type TransBase struct { + IDGenerator + Dtm string + // WaitResult 是否等待全局事务的最终结果 + WaitResult bool +} + +// TransBaseFromReq construct xa info from request +func TransBaseFromReq(c *gin.Context) *TransBase { + return &TransBase{ + IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, + Dtm: c.Query("dtm"), + } +} + // CallDtm 调用dtm服务器,返回事务的状态 -func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) error { +func (tb *TransBase) CallDtm(body interface{}, operation string) error { resp, err := common.RestyClient.R().SetQueryParams(common.MS{ - "wait_result": common.If(opt.WaitResult, "1", "").(string), - }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) + "wait_result": common.If(tb.WaitResult, "1", "").(string), + }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation)) if err != nil { return err } @@ -88,10 +99,6 @@ func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) return nil } -func callDtmSimple(dtm string, body interface{}, operation string) error { - return CallDtm(dtm, body, operation, &TransOptions{}) -} - // ErrFailure 表示返回失败,要求回滚 var ErrFailure = errors.New("transaction FAILURE") diff --git a/dtmcli/xa.go b/dtmcli/xa.go index cac6384..860f093 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -33,16 +33,13 @@ type XaClient struct { // Xa xa transaction type Xa struct { - IDGenerator Gid string + TransBase } // XaFromReq construct xa info from request func XaFromReq(c *gin.Context) *Xa { - return &Xa{ - Gid: c.Query("gid"), - IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, - } + return &Xa{TransBase: *TransBaseFromReq(c), Gid: c.Query("gid")} } // NewXaClient construct a xa client @@ -73,6 +70,7 @@ func (xc *XaClient) HandleCallback(gid string, branchID string, action string) ( // XaLocalTransaction start a xa local transaction func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) { xa := XaFromReq(c) + xa.Dtm = xc.Server branchID := xa.NewBranchID() xaBranch := xa.Gid + "-" + branchID db := common.SdbAlone(xc.Conf) @@ -99,18 +97,18 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret if rerr != nil { return } - rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) + rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch") return } // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { - xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} + xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, Gid: gid} data := &M{ "gid": gid, "trans_type": "xa", } - rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) + rerr = xa.CallDtm(data, "prepare") if rerr != nil { return } @@ -119,7 +117,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e defer func() { x := recover() operation := common.If(x != nil || rerr != nil, "abort", "submit").(string) - err := CallDtm(xc.Server, data, operation, &TransOptions{}) + err := xa.CallDtm(data, operation) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } diff --git a/dtmsvr/trans_tcc_barrier_test.go b/dtmsvr/trans_tcc_barrier_test.go index f04fdc8..e1eb9d5 100644 --- a/dtmsvr/trans_tcc_barrier_test.go +++ b/dtmsvr/trans_tcc_barrier_test.go @@ -69,7 +69,7 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - err := dtmcli.CallDtm(tcc.Dtm, M{ + err := tcc.CallDtm(M{ "gid": tcc.Gid, "branch_id": branchID, "trans_type": "tcc", @@ -78,7 +78,7 @@ func tccBarrierDisorder(t *testing.T) { "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, - }, "registerTccBranch", &dtmcli.TransOptions{}) + }, "registerTccBranch") assert.Nil(t, err) go func() { logrus.Printf("sleeping to wait for tcc try timeout")