tcc changed

This commit is contained in:
yedf2 2021-08-03 15:23:02 +08:00
parent a28f0f3430
commit 483c76d015
7 changed files with 53 additions and 60 deletions

View File

@ -60,5 +60,5 @@ func (s *Msg) Submit() error {
// SubmitExt 高级submit更多的选项和更详细的返回值 // SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) { func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) {
return callDtm(s.Server, &s.MsgData, "submit", opt) return CallDtm(s.Server, &s.MsgData, "submit", opt)
} }

View File

@ -56,5 +56,5 @@ func (s *Saga) Submit() error {
// SubmitExt 高级submit更多的选项和更详细的返回值 // SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) { func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) {
return callDtm(s.Server, &s.SagaData, "submit", opt) return CallDtm(s.Server, &s.SagaData, "submit", opt)
} }

View File

@ -28,26 +28,24 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret in
"trans_type": "tcc", "trans_type": "tcc",
} }
tcc := &Tcc{Dtm: dtm, Gid: gid} tcc := &Tcc{Dtm: dtm, Gid: gid}
resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") resp, err := CallDtm(dtm, data, "prepare", &TransOptions{})
if IsFailure(resp, err) { if err != nil {
return resp, err return resp, err
} }
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题 // 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题
defer func() { defer func() {
var x interface{} x := recover()
if x = recover(); x != nil || IsFailure(ret, rerr) { operation := common.If(x == nil && rerr == nil, "submit", "abort").(string)
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") resp, err = CallDtm(dtm, data, operation, &TransOptions{})
} else { if rerr == nil {
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") rerr = err
}
if IsFailure(resp, err) {
common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String())
} }
if x != nil { if x != nil {
panic(x) panic(x)
} }
}() }()
ret, rerr = tccFunc(tcc) ret, rerr = tccFunc(tcc)
rerr = CheckResult(ret, rerr)
return return
} }
@ -68,8 +66,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
// 函数首先注册子事务的所有分支成功后调用try分支返回try分支的调用结果 // 函数首先注册子事务的所有分支成功后调用try分支返回try分支的调用结果
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
branchID := t.NewBranchID() branchID := t.NewBranchID()
resp, err := common.RestyClient.R(). _, err := CallDtm(t.Dtm, &M{
SetBody(&M{
"gid": t.Gid, "gid": t.Gid,
"branch_id": branchID, "branch_id": branchID,
"trans_type": "tcc", "trans_type": "tcc",
@ -78,12 +75,11 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"try": tryURL, "try": tryURL,
"confirm": confirmURL, "confirm": confirmURL,
"cancel": cancelURL, "cancel": cancelURL,
}). }, "registerTccBranch", &TransOptions{})
Post(t.Dtm + "/registerTccBranch") if err != nil {
if IsFailure(resp, err) { return nil, err
return resp, err
} }
return common.RestyClient.R(). resp, err := common.RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(common.MS{ SetQueryParams(common.MS{
"dtm": t.Dtm, "dtm": t.Dtm,
@ -93,4 +89,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"branch_type": "try", "branch_type": "try",
}). }).
Post(tryURL) Post(tryURL)
return resp, CheckResponse(resp, err)
} }

View File

@ -122,7 +122,7 @@ type TransResult struct {
Message string Message string
} }
func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) { func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) {
resp, err := common.RestyClient.R().SetQueryParams(common.MS{ resp, err := common.RestyClient.R().SetQueryParams(common.MS{
"wait_result": common.If(opt.WaitResult, "1", "").(string), "wait_result": common.If(opt.WaitResult, "1", "").(string),
}).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation))
@ -138,7 +138,7 @@ func callDtm(dtm string, body interface{}, operation string, opt *TransOptions)
} }
func callDtmSimple(dtm string, body interface{}, operation string) error { func callDtmSimple(dtm string, body interface{}, operation string) error {
_, err := callDtm(dtm, body, operation, &TransOptions{}) _, err := CallDtm(dtm, body, operation, &TransOptions{})
return err return err
} }

View File

@ -99,7 +99,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret
if rerr != nil { if rerr != nil {
return return
} }
_, rerr = callDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) _, rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{})
return return
} }
@ -110,7 +110,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status
"gid": gid, "gid": gid,
"trans_type": "xa", "trans_type": "xa",
} }
status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{}) status, rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{})
if rerr != nil { if rerr != nil {
return return
} }
@ -120,7 +120,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status
x := recover() x := recover()
operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string) operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string)
var err error var err error
status, err = callDtm(xc.Server, data, operation, &TransOptions{}) status, err = CallDtm(xc.Server, data, operation, &TransOptions{})
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的 if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err rerr = err
} }

View File

@ -36,12 +36,12 @@ func tccBarrierRollback(t *testing.T) {
func tccBarrierNormal(t *testing.T) { func tccBarrierNormal(t *testing.T) {
gid := "tccBarrierNormal" gid := "tccBarrierNormal"
resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") _, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.True(t, !dtmcli.IsFailure(resp, err)) assert.Nil(t, err)
return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
}) })
assert.True(t, !dtmcli.IsFailure(resp, err)) assert.Nil(t, err)
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, "succeed", getTransStatus(gid)) assert.Equal(t, "succeed", getTransStatus(gid))
} }
@ -69,8 +69,7 @@ func tccBarrierDisorder(t *testing.T) {
return res, err return res, err
})) }))
// 注册子事务 // 注册子事务
r, err := common.RestyClient.R(). _, err := dtmcli.CallDtm(tcc.Dtm, M{
SetBody(&M{
"gid": tcc.Gid, "gid": tcc.Gid,
"branch_id": branchID, "branch_id": branchID,
"trans_type": "tcc", "trans_type": "tcc",
@ -79,13 +78,12 @@ func tccBarrierDisorder(t *testing.T) {
"try": tryURL, "try": tryURL,
"confirm": confirmURL, "confirm": confirmURL,
"cancel": cancelURL, "cancel": cancelURL,
}). }, "registerTccBranch", &dtmcli.TransOptions{})
Post(tcc.Dtm + "/registerTccBranch") assert.Nil(t, err)
assert.True(t, !dtmcli.IsFailure(r, err))
go func() { go func() {
logrus.Printf("sleeping to wait for tcc try timeout") logrus.Printf("sleeping to wait for tcc try timeout")
<-timeoutChan <-timeoutChan
r, _ = common.RestyClient.R(). r, _ := common.RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(common.MS{ SetQueryParams(common.MS{
"dtm": tcc.Dtm, "dtm": tcc.Dtm,

View File

@ -19,26 +19,24 @@ func TestTcc(t *testing.T) {
func tccNormal(t *testing.T) { func tccNormal(t *testing.T) {
data := &examples.TransReq{Amount: 30} data := &examples.TransReq{Amount: 30}
gid := "tccNormal" gid := "tccNormal"
ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") _, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if dtmcli.IsFailure(resp, err) { assert.Nil(t, err)
return resp, err
}
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
}) })
dtmcli.PanicIfFailure(ret, err) assert.Nil(t, err)
} }
func tccRollback(t *testing.T) { func tccRollback(t *testing.T) {
gid := "tccRollback" gid := "tccRollback"
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) { _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") _, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.True(t, !dtmcli.IsFailure(resp, rerr)) assert.Nil(t, rerr)
examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING")
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
}) })
assert.True(t, dtmcli.IsFailure(resp, err)) assert.Error(t, err)
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, "aborting", getTransStatus(gid)) assert.Equal(t, "aborting", getTransStatus(gid))
CronTransOnce(60 * time.Second) CronTransOnce(60 * time.Second)