optimize dtmcli
This commit is contained in:
parent
f005c1a274
commit
a7c72162da
@ -15,33 +15,25 @@ type MsgStep struct {
|
|||||||
|
|
||||||
// NewMsg create new msg
|
// NewMsg create new msg
|
||||||
func NewMsg(server string, gid string) *Msg {
|
func NewMsg(server string, gid string) *Msg {
|
||||||
return &Msg{
|
return &Msg{TransBase: *NewTransBase(gid, "msg", server, "")}
|
||||||
TransBase: TransBase{
|
|
||||||
Gid: gid,
|
|
||||||
TransType: "msg",
|
|
||||||
Dtm: server,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a new step
|
// Add add a new step
|
||||||
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
||||||
Logf("msg %s Add %s %v", s.Gid, action, postData)
|
s.Steps = append(s.Steps, MsgStep{
|
||||||
step := MsgStep{
|
|
||||||
Action: action,
|
Action: action,
|
||||||
Data: MustMarshalString(postData),
|
Data: MustMarshalString(postData),
|
||||||
}
|
})
|
||||||
s.Steps = append(s.Steps, step)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare prepare the msg
|
// Prepare prepare the msg
|
||||||
func (s *Msg) Prepare(queryPrepared string) error {
|
func (s *Msg) Prepare(queryPrepared string) error {
|
||||||
s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared)
|
s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared)
|
||||||
return s.CallDtm(s, "prepare")
|
return s.callDtm(s, "prepare")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit submit the msg
|
// Submit submit the msg
|
||||||
func (s *Msg) Submit() error {
|
func (s *Msg) Submit() error {
|
||||||
return s.CallDtm(s, "submit")
|
return s.callDtm(s, "submit")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,28 +15,20 @@ type SagaStep struct {
|
|||||||
|
|
||||||
// NewSaga create a saga
|
// NewSaga create a saga
|
||||||
func NewSaga(server string, gid string) *Saga {
|
func NewSaga(server string, gid string) *Saga {
|
||||||
return &Saga{
|
return &Saga{TransBase: *NewTransBase(gid, "saga", server, "")}
|
||||||
TransBase: TransBase{
|
|
||||||
Gid: gid,
|
|
||||||
TransType: "saga",
|
|
||||||
Dtm: server,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a saga step
|
// Add add a saga step
|
||||||
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
||||||
Logf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
s.Steps = append(s.Steps, SagaStep{
|
||||||
step := SagaStep{
|
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
Data: MustMarshalString(postData),
|
Data: MustMarshalString(postData),
|
||||||
}
|
})
|
||||||
s.Steps = append(s.Steps, step)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit submit the saga trans
|
// Submit submit the saga trans
|
||||||
func (s *Saga) Submit() error {
|
func (s *Saga) Submit() error {
|
||||||
return s.CallDtm(s, "submit")
|
return s.callDtm(s, "submit")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,13 +20,8 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
|
|||||||
// gid 全局事务id
|
// gid 全局事务id
|
||||||
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
||||||
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
||||||
tcc := &Tcc{
|
tcc := &Tcc{TransBase: *NewTransBase(gid, "tcc", dtm, "")}
|
||||||
TransBase: TransBase{
|
rerr = tcc.callDtm(tcc, "prepare")
|
||||||
Gid: gid,
|
|
||||||
TransType: "tcc",
|
|
||||||
Dtm: dtm,
|
|
||||||
}}
|
|
||||||
rerr = tcc.CallDtm(tcc, "prepare")
|
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return rerr
|
return rerr
|
||||||
}
|
}
|
||||||
@ -34,7 +29,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
defer func() {
|
defer func() {
|
||||||
x := recover()
|
x := recover()
|
||||||
operation := If(x == nil && rerr == nil, "submit", "abort").(string)
|
operation := If(x == nil && rerr == nil, "submit", "abort").(string)
|
||||||
err := tcc.CallDtm(tcc, operation)
|
err := tcc.callDtm(tcc, operation)
|
||||||
if rerr == nil {
|
if rerr == nil {
|
||||||
rerr = err
|
rerr = err
|
||||||
}
|
}
|
||||||
@ -49,9 +44,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
|
|
||||||
// TccFromQuery tcc from request info
|
// TccFromQuery tcc from request info
|
||||||
func TccFromQuery(qs url.Values) (*Tcc, error) {
|
func TccFromQuery(qs url.Values) (*Tcc, error) {
|
||||||
tcc := &Tcc{
|
tcc := &Tcc{TransBase: *TransBaseFromQuery(qs)}
|
||||||
TransBase: *TransBaseFromQuery(qs),
|
|
||||||
}
|
|
||||||
if tcc.Dtm == "" || tcc.Gid == "" {
|
if tcc.Dtm == "" || tcc.Gid == "" {
|
||||||
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID)
|
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID)
|
||||||
}
|
}
|
||||||
@ -62,7 +55,7 @@ func TccFromQuery(qs url.Values) (*Tcc, error) {
|
|||||||
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
|
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
|
||||||
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
|
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
|
||||||
branchID := t.NewBranchID()
|
branchID := t.NewBranchID()
|
||||||
err := t.CallDtm(&M{
|
err := t.callDtm(&M{
|
||||||
"gid": t.Gid,
|
"gid": t.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
|
|||||||
@ -71,8 +71,8 @@ func TransBaseFromQuery(qs url.Values) *TransBase {
|
|||||||
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
|
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallDtm 调用dtm服务器,返回事务的状态
|
// callDtm 调用dtm服务器,返回事务的状态
|
||||||
func (tb *TransBase) CallDtm(body interface{}, operation string) error {
|
func (tb *TransBase) callDtm(body interface{}, operation string) error {
|
||||||
params := MS{}
|
params := MS{}
|
||||||
if tb.WaitResult {
|
if tb.WaitResult {
|
||||||
params["wait_result"] = "1"
|
params["wait_result"] = "1"
|
||||||
|
|||||||
10
dtmcli/xa.go
10
dtmcli/xa.go
@ -102,23 +102,22 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i
|
|||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch")
|
rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaGlobalTransaction start a xa global transaction
|
// XaGlobalTransaction start a xa global transaction
|
||||||
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
|
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
|
||||||
xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")}
|
xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")}
|
||||||
rerr = xa.CallDtm(xa, "prepare")
|
rerr = xa.callDtm(xa, "prepare")
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var resp *resty.Response
|
|
||||||
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
||||||
defer func() {
|
defer func() {
|
||||||
x := recover()
|
x := recover()
|
||||||
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
|
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
|
||||||
err := xa.CallDtm(xa, operation)
|
err := xa.callDtm(xa, operation)
|
||||||
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
||||||
rerr = err
|
rerr = err
|
||||||
}
|
}
|
||||||
@ -126,7 +125,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e
|
|||||||
panic(x)
|
panic(x)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
resp, rerr = xaFunc(&xa)
|
resp, rerr := xaFunc(&xa)
|
||||||
rerr = CheckResponse(resp, rerr)
|
rerr = CheckResponse(resp, rerr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -137,6 +136,7 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
|||||||
resp, err := RestyClient.R().
|
resp, err := RestyClient.R().
|
||||||
SetBody(body).
|
SetBody(body).
|
||||||
SetQueryParams(MS{
|
SetQueryParams(MS{
|
||||||
|
"dtm": x.Dtm,
|
||||||
"gid": x.Gid,
|
"gid": x.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "xa",
|
"trans_type": "xa",
|
||||||
|
|||||||
@ -15,18 +15,15 @@ type MsgGrpc struct {
|
|||||||
|
|
||||||
// NewMsgGrpc create new msg
|
// NewMsgGrpc create new msg
|
||||||
func NewMsgGrpc(server string, gid string) *MsgGrpc {
|
func NewMsgGrpc(server string, gid string) *MsgGrpc {
|
||||||
return &MsgGrpc{
|
return &MsgGrpc{TransBase: *dtmcli.NewTransBase(gid, "msg", server, "")}
|
||||||
TransBase: *dtmcli.NewTransBase(gid, "msg", server, ""),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a new step
|
// Add add a new step
|
||||||
func (s *MsgGrpc) Add(action string, data []byte) *MsgGrpc {
|
func (s *MsgGrpc) Add(action string, data []byte) *MsgGrpc {
|
||||||
step := dtmcli.MsgStep{
|
s.Steps = append(s.Steps, dtmcli.MsgStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Data: string(data),
|
Data: string(data),
|
||||||
}
|
})
|
||||||
s.Steps = append(s.Steps, step)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -14,20 +14,16 @@ type SagaGrpc struct {
|
|||||||
|
|
||||||
// NewSaga create a saga
|
// NewSaga create a saga
|
||||||
func NewSaga(server string, gid string) *SagaGrpc {
|
func NewSaga(server string, gid string) *SagaGrpc {
|
||||||
return &SagaGrpc{
|
return &SagaGrpc{TransBase: *dtmcli.NewTransBase(gid, "saga", server, "")}
|
||||||
TransBase: *dtmcli.NewTransBase(gid, "saga", server, ""),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a saga step
|
// Add add a saga step
|
||||||
func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc {
|
func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc {
|
||||||
dtmcli.Logf("saga %s Add %s %s %v", s.Gid, action, compensate, string(busiData))
|
s.Steps = append(s.Steps, dtmcli.SagaStep{
|
||||||
step := dtmcli.SagaStep{
|
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
Data: string(busiData),
|
Data: string(busiData),
|
||||||
}
|
})
|
||||||
s.Steps = append(s.Steps, step)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -68,7 +68,8 @@ func tccBarrierDisorder(t *testing.T) {
|
|||||||
return res, err
|
return res, err
|
||||||
}))
|
}))
|
||||||
// 注册子事务
|
// 注册子事务
|
||||||
err := tcc.CallDtm(M{
|
resp, err := dtmcli.RestyClient.R().
|
||||||
|
SetResult(&dtmcli.TransResult{}).SetBody(M{
|
||||||
"gid": tcc.Gid,
|
"gid": tcc.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
@ -77,8 +78,11 @@ func tccBarrierDisorder(t *testing.T) {
|
|||||||
"try": tryURL,
|
"try": tryURL,
|
||||||
"confirm": confirmURL,
|
"confirm": confirmURL,
|
||||||
"cancel": cancelURL,
|
"cancel": cancelURL,
|
||||||
}, "registerTccBranch")
|
}).Post(fmt.Sprintf("%s/%s", tcc.Dtm, "registerTccBranch"))
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
tr := resp.Result().(*dtmcli.TransResult)
|
||||||
|
assert.Equal(t, "SUCCESS", tr.DtmResult)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
dtmcli.Logf("sleeping to wait for tcc try timeout")
|
dtmcli.Logf("sleeping to wait for tcc try timeout")
|
||||||
<-timeoutChan
|
<-timeoutChan
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user