xa new interface seems ok

This commit is contained in:
yedf2 2021-08-03 12:00:28 +08:00
parent eb4a4cde60
commit 31a69e3c84
7 changed files with 103 additions and 39 deletions

View File

@ -1,8 +1,6 @@
package dtmcli package dtmcli
import ( import (
"fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -49,17 +47,18 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s return s
} }
// Submit submit the msg
func (s *Msg) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
return CheckDtmResponse(resp, err)
}
// Prepare prepare the msg // Prepare prepare the msg
func (s *Msg) Prepare(queryPrepared string) error { func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) return callDtmSimple(s.Server, &s.MsgData, "prepare")
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) }
return CheckDtmResponse(resp, err)
// Submit submit the msg
func (s *Msg) Submit() error {
return callDtmSimple(s.Server, &s.MsgData, "submit")
}
// SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Msg) SubmitExt(opt *TransOptions) (TransStatus, error) {
return callDtm(s.Server, &s.MsgData, "submit", opt)
} }

View File

@ -1,8 +1,6 @@
package dtmcli package dtmcli
import ( import (
"fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -52,7 +50,11 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
// Submit submit the saga trans // Submit submit the saga trans
func (s *Saga) Submit() error { func (s *Saga) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) _, err := s.SubmitExt(&TransOptions{})
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) return err
return CheckDtmResponse(resp, err) }
// SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Saga) SubmitExt(opt *TransOptions) (TransStatus, error) {
return callDtm(s.Server, &s.SagaData, "submit", opt)
} }

View File

@ -1,6 +1,7 @@
package dtmcli package dtmcli
import ( import (
"errors"
"fmt" "fmt"
"strings" "strings"
@ -61,3 +62,61 @@ func (g *IDGenerator) NewBranchID() string {
g.branchID = g.branchID + 1 g.branchID = g.branchID + 1
return g.parentID + fmt.Sprintf("%02d", g.branchID) return g.parentID + fmt.Sprintf("%02d", g.branchID)
} }
// TransStatus 全局事务状态采用string
type TransStatus string
const (
// TransEmpty 空值
TransEmpty TransStatus = ""
// TransSubmitted 已提交给DTM
TransSubmitted TransStatus = "submitted"
// TransAborting 正在回滚中有两种情况会出现一是用户侧发起abort请求而是发起submit同步请求但是dtm进行回滚中出现错误
TransAborting TransStatus = "aborting"
// TransSucceed 事务已完成
TransSucceed TransStatus = "succeed"
// TransFailed 事务已回滚
TransFailed TransStatus = "failed"
// TransErrorPrepare prepare调用报错
TransErrorPrepare TransStatus = "error_parepare"
// TransErrorSubmit submit调用报错
TransErrorSubmit TransStatus = "error_submit"
// TransErrorAbort abort调用报错
TransErrorAbort TransStatus = "error_abort"
)
// TransOptions 提交/终止事务的选项
type TransOptions struct {
// WaitResult 是否等待全局事务的最终结果
WaitResult bool
}
// TransResult dtm 返回的结果
type TransResult struct {
DtmResult string `json:"dtm_result"`
Status TransStatus
Message string
}
func callDtm(dtm string, body interface{}, operation string, opt *TransOptions) (TransStatus, error) {
resp, err := common.RestyClient.R().SetQueryParams(common.MS{
"wait_result": common.If(opt.WaitResult, "1", "").(string),
}).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation))
errStatus := TransStatus("error_" + operation)
if err != nil {
return errStatus, err
}
tr := resp.Result().(*TransResult)
if tr.DtmResult == "FAILURE" {
return errStatus, errors.New(tr.Message)
}
return tr.Status, nil
}
func callDtmSimple(dtm string, body interface{}, operation string) error {
_, err := callDtm(dtm, body, operation, &TransOptions{})
return err
}
// ErrUserFailure 表示用户返回失败,要求回滚
var ErrUserFailure = errors.New("user return FAILURE")

View File

@ -109,39 +109,38 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret
} }
// XaGlobalTransaction start a xa global transaction // XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (ret interface{}, rerr error) { func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (status TransStatus, rerr error) {
xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} xa := Xa{IDGenerator: IDGenerator{}, Gid: gid}
data := &M{ data := &M{
"gid": gid, "gid": gid,
"trans_type": "xa", "trans_type": "xa",
} }
resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") status, rerr = callDtm(xc.Server, data, "prepare", &TransOptions{})
if IsFailure(resp, err) { if rerr != nil {
return resp, err return
} }
var resp *resty.Response
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题 // 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题
defer func() { defer func() {
var x interface{} x := recover()
if x = recover(); x != nil || IsFailure(ret, rerr) { operation := common.If(x != nil || IsFailure(resp, rerr), "abort", "submit").(string)
resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") var err error
} else { status, err = callDtm(xc.Server, data, operation, &TransOptions{})
resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
} rerr = err
if IsFailure(resp, err) {
common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String())
} }
if x != nil { if x != nil {
panic(x) panic(x)
} }
}() }()
ret, rerr = xaFunc(&xa) resp, rerr = xaFunc(&xa)
return return
} }
// CallBranch call a xa branch // CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewBranchID() branchID := x.NewBranchID()
return common.RestyClient.R(). resp, err := common.RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(common.MS{ SetQueryParams(common.MS{
"gid": x.Gid, "gid": x.Gid,
@ -150,4 +149,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
"branch_type": "action", "branch_type": "action",
}). }).
Post(url) Post(url)
if IsFailure(resp, nil) {
err = ErrUserFailure
}
return resp, err
} }

View File

@ -21,7 +21,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch {
func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetQueryParams(common.MS{ resp, err := common.RestyClient.R().SetQueryParams(common.MS{
"branch_id": branch.BranchID, "branch_id": branch.BranchID,
"action": common.If(t.Status == "prepared", "rollback", "commit").(string), "action": common.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string),
"gid": branch.Gid, "gid": branch.Gid,
}).Post(branch.URL) }).Post(branch.URL)
e2p(err) e2p(err)

View File

@ -20,7 +20,8 @@ func TestXa(t *testing.T) {
} }
func xaLocalError(t *testing.T) { func xaLocalError(t *testing.T) {
_, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { xc := examples.XaClient
_, err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) {
return nil, fmt.Errorf("an error") return nil, fmt.Errorf("an error")
}) })
assert.Error(t, err, fmt.Errorf("an error")) assert.Error(t, err, fmt.Errorf("an error"))
@ -29,7 +30,7 @@ func xaLocalError(t *testing.T) {
func xaNormal(t *testing.T) { func xaNormal(t *testing.T) {
xc := examples.XaClient xc := examples.XaClient
gid := "xaNormal" gid := "xaNormal"
res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := examples.GenTransReq(30, false, false) req := examples.GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
if dtmcli.IsFailure(resp, err) { if dtmcli.IsFailure(resp, err) {
@ -37,7 +38,7 @@ func xaNormal(t *testing.T) {
} }
return xa.CallBranch(req, examples.Busi+"/TransInXa") return xa.CallBranch(req, examples.Busi+"/TransInXa")
}) })
dtmcli.PanicIfFailure(res, err) assert.Equal(t, nil, err)
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
} }
@ -45,7 +46,7 @@ func xaNormal(t *testing.T) {
func xaRollback(t *testing.T) { func xaRollback(t *testing.T) {
xc := examples.XaClient xc := examples.XaClient
gid := "xaRollback" gid := "xaRollback"
res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { _, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
if dtmcli.IsFailure(resp, err) { if dtmcli.IsFailure(resp, err) {
@ -53,7 +54,7 @@ func xaRollback(t *testing.T) {
} }
return xa.CallBranch(req, examples.Busi+"/TransInXa") return xa.CallBranch(req, examples.Busi+"/TransInXa")
}) })
assert.True(t, dtmcli.IsFailure(res, err)) assert.Error(t, err)
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
assert.Equal(t, "failed", getTransStatus(gid)) assert.Equal(t, "failed", getTransStatus(gid))

View File

@ -28,14 +28,14 @@ func XaSetup(app *gin.Engine) {
// XaFireRequest 注册全局XA事务调用XA的分支 // XaFireRequest 注册全局XA事务调用XA的分支
func XaFireRequest() string { func XaFireRequest() string {
gid := dtmcli.MustGenGid(DtmServer) gid := dtmcli.MustGenGid(DtmServer)
res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { _, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa")
if dtmcli.IsFailure(resp, err) { if dtmcli.IsFailure(resp, err) {
return resp, err return resp, err
} }
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
}) })
dtmcli.PanicIfFailure(res, err) e2p(err)
return gid return gid
} }