CallDtm move to TransBase

This commit is contained in:
yedf2 2021-08-03 16:59:03 +08:00
parent b20d18ecbb
commit 864ef5b4f7
6 changed files with 50 additions and 53 deletions

View File

@ -8,7 +8,7 @@ import (
// Msg reliable msg type // Msg reliable msg type
type Msg struct { type Msg struct {
MsgData MsgData
Server string TransBase
} }
// MsgData msg data // MsgData msg data
@ -32,13 +32,15 @@ func NewMsg(server string, gid string) *Msg {
Gid: gid, Gid: gid,
TransType: "msg", TransType: "msg",
}, },
Server: server, TransBase: TransBase{
Dtm: server,
},
} }
} }
// Add add a new step // Add add a new step
func (s *Msg) Add(action string, postData interface{}) *Msg { func (s *Msg) Add(action string, postData interface{}) *Msg {
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) logrus.Printf("msg %s Add %s %v", s.MsgData.Gid, action, postData)
step := MsgStep{ step := MsgStep{
Action: action, Action: action,
Data: common.MustMarshalString(postData), Data: common.MustMarshalString(postData),
@ -50,15 +52,10 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
// Prepare prepare the msg // Prepare prepare the msg
func (s *Msg) Prepare(queryPrepared string) error { func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
return callDtmSimple(s.Server, &s.MsgData, "prepare") return s.CallDtm(&s.MsgData, "prepare")
} }
// Submit submit the msg // Submit submit the msg
func (s *Msg) Submit() error { func (s *Msg) Submit() error {
return callDtmSimple(s.Server, &s.MsgData, "submit") return s.CallDtm(&s.MsgData, "submit")
}
// SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Msg) SubmitExt(opt *TransOptions) error {
return CallDtm(s.Server, &s.MsgData, "submit", opt)
} }

View File

@ -8,7 +8,7 @@ import (
// Saga struct of saga // Saga struct of saga
type Saga struct { type Saga struct {
SagaData SagaData
Server string TransBase
} }
// SagaData sage data // SagaData sage data
@ -32,13 +32,15 @@ func NewSaga(server string, gid string) *Saga {
Gid: gid, Gid: gid,
TransType: "saga", TransType: "saga",
}, },
Server: server, TransBase: TransBase{
Dtm: server,
},
} }
} }
// Add add a saga step // Add add a saga step
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) logrus.Printf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData)
step := SagaStep{ step := SagaStep{
Action: action, Action: action,
Compensate: compensate, Compensate: compensate,
@ -50,10 +52,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
// Submit submit the saga trans // Submit submit the saga trans
func (s *Saga) Submit() error { func (s *Saga) Submit() error {
return s.SubmitExt(&TransOptions{}) return s.CallDtm(&s.SagaData, "submit")
}
// SubmitExt 高级submit更多的选项和更详细的返回值
func (s *Saga) SubmitExt(opt *TransOptions) error {
return CallDtm(s.Server, &s.SagaData, "submit", opt)
} }

View File

@ -10,9 +10,8 @@ import (
// Tcc struct of tcc // Tcc struct of tcc
type Tcc struct { type Tcc struct {
IDGenerator
Dtm string
Gid string Gid string
TransBase
} }
// TccGlobalFunc type of global tcc call // TccGlobalFunc type of global tcc call
@ -27,8 +26,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
"gid": gid, "gid": gid,
"trans_type": "tcc", "trans_type": "tcc",
} }
tcc := &Tcc{Dtm: dtm, Gid: gid} tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, Gid: gid}
rerr = CallDtm(dtm, data, "prepare", &TransOptions{}) rerr = tcc.CallDtm(data, "prepare")
if rerr != nil { if rerr != nil {
return rerr return rerr
} }
@ -37,7 +36,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
x := recover() x := recover()
var err error var err error
operation := common.If(x == nil && rerr == nil, "submit", "abort").(string) operation := common.If(x == nil && rerr == nil, "submit", "abort").(string)
err = CallDtm(dtm, data, operation, &TransOptions{}) err = tcc.CallDtm(data, operation)
if rerr == nil { if rerr == nil {
rerr = err rerr = err
} }
@ -53,9 +52,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
// TccFromReq tcc from request info // TccFromReq tcc from request info
func TccFromReq(c *gin.Context) (*Tcc, error) { func TccFromReq(c *gin.Context) (*Tcc, error) {
tcc := &Tcc{ tcc := &Tcc{
Dtm: c.Query("dtm"), TransBase: *TransBaseFromReq(c),
Gid: c.Query("gid"), Gid: c.Query("gid"),
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
} }
if tcc.Dtm == "" || tcc.Gid == "" { if tcc.Dtm == "" || tcc.Gid == "" {
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid) return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid)
@ -67,7 +65,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
// 函数首先注册子事务的所有分支成功后调用try分支返回try分支的调用结果 // 函数首先注册子事务的所有分支成功后调用try分支返回try分支的调用结果
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
branchID := t.NewBranchID() branchID := t.NewBranchID()
err := CallDtm(t.Dtm, &M{ err := t.CallDtm(&M{
"gid": t.Gid, "gid": t.Gid,
"branch_id": branchID, "branch_id": branchID,
"trans_type": "tcc", "trans_type": "tcc",
@ -76,7 +74,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"try": tryURL, "try": tryURL,
"confirm": confirmURL, "confirm": confirmURL,
"cancel": cancelURL, "cancel": cancelURL,
}, "registerTccBranch", &TransOptions{}) }, "registerTccBranch")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -61,23 +62,33 @@ func (g *IDGenerator) NewBranchID() string {
return g.parentID + fmt.Sprintf("%02d", g.branchID) return g.parentID + fmt.Sprintf("%02d", g.branchID)
} }
// TransOptions 提交/终止事务的选项
type TransOptions struct {
// WaitResult 是否等待全局事务的最终结果
WaitResult bool
}
// TransResult dtm 返回的结果 // TransResult dtm 返回的结果
type TransResult struct { type TransResult struct {
DtmResult string `json:"dtm_result"` DtmResult string `json:"dtm_result"`
Message string Message string
} }
// TransBase 事务的基础类
type TransBase struct {
IDGenerator
Dtm string
// WaitResult 是否等待全局事务的最终结果
WaitResult bool
}
// TransBaseFromReq construct xa info from request
func TransBaseFromReq(c *gin.Context) *TransBase {
return &TransBase{
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
Dtm: c.Query("dtm"),
}
}
// CallDtm 调用dtm服务器返回事务的状态 // CallDtm 调用dtm服务器返回事务的状态
func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions) error { func (tb *TransBase) CallDtm(body interface{}, operation string) error {
resp, err := common.RestyClient.R().SetQueryParams(common.MS{ resp, err := common.RestyClient.R().SetQueryParams(common.MS{
"wait_result": common.If(opt.WaitResult, "1", "").(string), "wait_result": common.If(tb.WaitResult, "1", "").(string),
}).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", dtm, operation)) }).SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
if err != nil { if err != nil {
return err return err
} }
@ -88,10 +99,6 @@ func CallDtm(dtm string, body interface{}, operation string, opt *TransOptions)
return nil return nil
} }
func callDtmSimple(dtm string, body interface{}, operation string) error {
return CallDtm(dtm, body, operation, &TransOptions{})
}
// ErrFailure 表示返回失败,要求回滚 // ErrFailure 表示返回失败,要求回滚
var ErrFailure = errors.New("transaction FAILURE") var ErrFailure = errors.New("transaction FAILURE")

View File

@ -33,16 +33,13 @@ type XaClient struct {
// Xa xa transaction // Xa xa transaction
type Xa struct { type Xa struct {
IDGenerator
Gid string Gid string
TransBase
} }
// XaFromReq construct xa info from request // XaFromReq construct xa info from request
func XaFromReq(c *gin.Context) *Xa { func XaFromReq(c *gin.Context) *Xa {
return &Xa{ return &Xa{TransBase: *TransBaseFromReq(c), Gid: c.Query("gid")}
Gid: c.Query("gid"),
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
}
} }
// NewXaClient construct a xa client // NewXaClient construct a xa client
@ -73,6 +70,7 @@ func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (
// XaLocalTransaction start a xa local transaction // XaLocalTransaction start a xa local transaction
func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) { func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) {
xa := XaFromReq(c) xa := XaFromReq(c)
xa.Dtm = xc.Server
branchID := xa.NewBranchID() branchID := xa.NewBranchID()
xaBranch := xa.Gid + "-" + branchID xaBranch := xa.Gid + "-" + branchID
db := common.SdbAlone(xc.Conf) db := common.SdbAlone(xc.Conf)
@ -99,18 +97,18 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret
if rerr != nil { if rerr != nil {
return return
} }
rerr = CallDtm(xc.Server, &M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch", &TransOptions{}) rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch")
return return
} }
// XaGlobalTransaction start a xa global transaction // XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
xa := Xa{IDGenerator: IDGenerator{}, Gid: gid} xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, Gid: gid}
data := &M{ data := &M{
"gid": gid, "gid": gid,
"trans_type": "xa", "trans_type": "xa",
} }
rerr = CallDtm(xc.Server, data, "prepare", &TransOptions{}) rerr = xa.CallDtm(data, "prepare")
if rerr != nil { if rerr != nil {
return return
} }
@ -119,7 +117,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e
defer func() { defer func() {
x := recover() x := recover()
operation := common.If(x != nil || rerr != nil, "abort", "submit").(string) operation := common.If(x != nil || rerr != nil, "abort", "submit").(string)
err := CallDtm(xc.Server, data, operation, &TransOptions{}) err := xa.CallDtm(data, operation)
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的 if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err rerr = err
} }

View File

@ -69,7 +69,7 @@ func tccBarrierDisorder(t *testing.T) {
return res, err return res, err
})) }))
// 注册子事务 // 注册子事务
err := dtmcli.CallDtm(tcc.Dtm, M{ err := tcc.CallDtm(M{
"gid": tcc.Gid, "gid": tcc.Gid,
"branch_id": branchID, "branch_id": branchID,
"trans_type": "tcc", "trans_type": "tcc",
@ -78,7 +78,7 @@ func tccBarrierDisorder(t *testing.T) {
"try": tryURL, "try": tryURL,
"confirm": confirmURL, "confirm": confirmURL,
"cancel": cancelURL, "cancel": cancelURL,
}, "registerTccBranch", &dtmcli.TransOptions{}) }, "registerTccBranch")
assert.Nil(t, err) assert.Nil(t, err)
go func() { go func() {
logrus.Printf("sleeping to wait for tcc try timeout") logrus.Printf("sleeping to wait for tcc try timeout")