From a7c72162daa9bc70b8c542c1c8975273f12e4719 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 11 Aug 2021 15:46:25 +0800 Subject: [PATCH] optimize dtmcli --- dtmcli/message.go | 18 +++++------------- dtmcli/saga.go | 16 ++++------------ dtmcli/tcc.go | 17 +++++------------ dtmcli/types.go | 4 ++-- dtmcli/xa.go | 10 +++++----- dtmgrpc/message.go | 9 +++------ dtmgrpc/saga.go | 10 +++------- test/barrier_tcc_test.go | 8 ++++++-- 8 files changed, 33 insertions(+), 59 deletions(-) diff --git a/dtmcli/message.go b/dtmcli/message.go index e460f3f..5f7b25f 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -15,33 +15,25 @@ type MsgStep struct { // NewMsg create new msg func NewMsg(server string, gid string) *Msg { - return &Msg{ - TransBase: TransBase{ - Gid: gid, - TransType: "msg", - Dtm: server, - }, - } + return &Msg{TransBase: *NewTransBase(gid, "msg", server, "")} } // Add add a new step func (s *Msg) Add(action string, postData interface{}) *Msg { - Logf("msg %s Add %s %v", s.Gid, action, postData) - step := MsgStep{ + s.Steps = append(s.Steps, MsgStep{ Action: action, Data: MustMarshalString(postData), - } - s.Steps = append(s.Steps, step) + }) return s } // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared) - return s.CallDtm(s, "prepare") + return s.callDtm(s, "prepare") } // Submit submit the msg func (s *Msg) Submit() error { - return s.CallDtm(s, "submit") + return s.callDtm(s, "submit") } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index 3d5127f..f6681d8 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -15,28 +15,20 @@ type SagaStep struct { // NewSaga create a saga func NewSaga(server string, gid string) *Saga { - return &Saga{ - TransBase: TransBase{ - Gid: gid, - TransType: "saga", - Dtm: server, - }, - } + return &Saga{TransBase: *NewTransBase(gid, "saga", server, "")} } // Add add a saga step func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { - Logf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) - step := SagaStep{ + s.Steps = append(s.Steps, SagaStep{ Action: action, Compensate: compensate, Data: MustMarshalString(postData), - } - s.Steps = append(s.Steps, step) + }) return s } // Submit submit the saga trans func (s *Saga) Submit() error { - return s.CallDtm(s, "submit") + return s.callDtm(s, "submit") } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index c252474..d1a2009 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -20,13 +20,8 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { - tcc := &Tcc{ - TransBase: TransBase{ - Gid: gid, - TransType: "tcc", - Dtm: dtm, - }} - rerr = tcc.CallDtm(tcc, "prepare") + tcc := &Tcc{TransBase: *NewTransBase(gid, "tcc", dtm, "")} + rerr = tcc.callDtm(tcc, "prepare") if rerr != nil { return rerr } @@ -34,7 +29,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e defer func() { x := recover() operation := If(x == nil && rerr == nil, "submit", "abort").(string) - err := tcc.CallDtm(tcc, operation) + err := tcc.callDtm(tcc, operation) if rerr == nil { rerr = err } @@ -49,9 +44,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e // TccFromQuery tcc from request info func TccFromQuery(qs url.Values) (*Tcc, error) { - tcc := &Tcc{ - TransBase: *TransBaseFromQuery(qs), - } + tcc := &Tcc{TransBase: *TransBaseFromQuery(qs)} if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID) } @@ -62,7 +55,7 @@ func TccFromQuery(qs url.Values) (*Tcc, error) { // 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() - err := t.CallDtm(&M{ + err := t.callDtm(&M{ "gid": t.Gid, "branch_id": branchID, "trans_type": "tcc", diff --git a/dtmcli/types.go b/dtmcli/types.go index ba30417..8af513a 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -71,8 +71,8 @@ func TransBaseFromQuery(qs url.Values) *TransBase { return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id")) } -// CallDtm 调用dtm服务器,返回事务的状态 -func (tb *TransBase) CallDtm(body interface{}, operation string) error { +// callDtm 调用dtm服务器,返回事务的状态 +func (tb *TransBase) callDtm(body interface{}, operation string) error { params := MS{} if tb.WaitResult { params["wait_result"] = "1" diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 28ca57d..c1c9542 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -102,23 +102,22 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i if rerr != nil { return } - rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch") + rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch") return } // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")} - rerr = xa.CallDtm(xa, "prepare") + rerr = xa.callDtm(xa, "prepare") if rerr != nil { return } - var resp *resty.Response // 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题 defer func() { x := recover() operation := If(x != nil || rerr != nil, "abort", "submit").(string) - err := xa.CallDtm(xa, operation) + err := xa.callDtm(xa, operation) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } @@ -126,7 +125,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e panic(x) } }() - resp, rerr = xaFunc(&xa) + resp, rerr := xaFunc(&xa) rerr = CheckResponse(resp, rerr) return } @@ -137,6 +136,7 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { resp, err := RestyClient.R(). SetBody(body). SetQueryParams(MS{ + "dtm": x.Dtm, "gid": x.Gid, "branch_id": branchID, "trans_type": "xa", diff --git a/dtmgrpc/message.go b/dtmgrpc/message.go index d188070..9f8a223 100644 --- a/dtmgrpc/message.go +++ b/dtmgrpc/message.go @@ -15,18 +15,15 @@ type MsgGrpc struct { // NewMsgGrpc create new msg func NewMsgGrpc(server string, gid string) *MsgGrpc { - return &MsgGrpc{ - TransBase: *dtmcli.NewTransBase(gid, "msg", server, ""), - } + return &MsgGrpc{TransBase: *dtmcli.NewTransBase(gid, "msg", server, "")} } // Add add a new step func (s *MsgGrpc) Add(action string, data []byte) *MsgGrpc { - step := dtmcli.MsgStep{ + s.Steps = append(s.Steps, dtmcli.MsgStep{ Action: action, Data: string(data), - } - s.Steps = append(s.Steps, step) + }) return s } diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go index 644c9cf..4d37353 100644 --- a/dtmgrpc/saga.go +++ b/dtmgrpc/saga.go @@ -14,20 +14,16 @@ type SagaGrpc struct { // NewSaga create a saga func NewSaga(server string, gid string) *SagaGrpc { - return &SagaGrpc{ - TransBase: *dtmcli.NewTransBase(gid, "saga", server, ""), - } + return &SagaGrpc{TransBase: *dtmcli.NewTransBase(gid, "saga", server, "")} } // Add add a saga step func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc { - dtmcli.Logf("saga %s Add %s %s %v", s.Gid, action, compensate, string(busiData)) - step := dtmcli.SagaStep{ + s.Steps = append(s.Steps, dtmcli.SagaStep{ Action: action, Compensate: compensate, Data: string(busiData), - } - s.Steps = append(s.Steps, step) + }) return s } diff --git a/test/barrier_tcc_test.go b/test/barrier_tcc_test.go index 54abcfe..036657c 100644 --- a/test/barrier_tcc_test.go +++ b/test/barrier_tcc_test.go @@ -68,7 +68,8 @@ func tccBarrierDisorder(t *testing.T) { return res, err })) // 注册子事务 - err := tcc.CallDtm(M{ + resp, err := dtmcli.RestyClient.R(). + SetResult(&dtmcli.TransResult{}).SetBody(M{ "gid": tcc.Gid, "branch_id": branchID, "trans_type": "tcc", @@ -77,8 +78,11 @@ func tccBarrierDisorder(t *testing.T) { "try": tryURL, "confirm": confirmURL, "cancel": cancelURL, - }, "registerTccBranch") + }).Post(fmt.Sprintf("%s/%s", tcc.Dtm, "registerTccBranch")) assert.Nil(t, err) + tr := resp.Result().(*dtmcli.TransResult) + assert.Equal(t, "SUCCESS", tr.DtmResult) + go func() { dtmcli.Logf("sleeping to wait for tcc try timeout") <-timeoutChan