decouple xa from gin

This commit is contained in:
yedf2 2021-08-01 15:51:04 +08:00
parent db4b6d59f0
commit 336dcf42e9
5 changed files with 30 additions and 37 deletions

View File

@ -21,14 +21,14 @@ func MustGenGid(server string) string {
// IsFailure 如果err非空或者ret是http的响应且包含FAILURE那么返回true。此时认为业务调用失败
func IsFailure(res interface{}, err error) bool {
resp, ok := res.(*resty.Response)
return err != nil || ok && (strings.Contains(resp.String(), "FAILURE") || resp.IsError())
return err != nil || // 包含错误
ok && (resp.IsError() || strings.Contains(resp.String(), "FAILURE")) || // resp包含failure
!ok && res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") // 结果中包含failure
}
// PanicIfFailure 如果err非空或者ret是http的响应且包含FAILURE那么Panic。此时认为业务调用失败
func PanicIfFailure(res interface{}, err error) {
resp, ok := res.(*resty.Response)
failure := err != nil || ok && (strings.Contains(resp.String(), "FAILURE") || resp.IsError())
if failure {
if IsFailure(res, err) {
panic(fmt.Errorf("dtm failure ret: %v err %v", res, err))
}
}

View File

@ -21,6 +21,9 @@ type XaGlobalFunc func(xa *Xa) (interface{}, error)
// XaLocalFunc type of xa local function
type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error)
// XaRegisterCallback type of xa register callback handler
type XaRegisterCallback func(path string, xa *XaClient)
// XaClient xa client
type XaClient struct {
Server string
@ -43,7 +46,7 @@ func XaFromReq(c *gin.Context) *Xa {
}
// NewXaClient construct a xa client
func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) (*XaClient, error) {
func NewXaClient(server string, mysqlConf map[string]string, callbackURL string, register XaRegisterCallback) (*XaClient, error) {
xa := &XaClient{
Server: server,
Conf: mysqlConf,
@ -53,33 +56,20 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca
if err != nil {
return nil, err
}
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
type CallbackReq struct {
Gid string `json:"gid"`
BranchID string `json:"branch_id"`
Action string `json:"action"`
}
req := CallbackReq{}
b, err := c.GetRawData()
if err != nil {
return nil, err
}
common.MustUnmarshal(b, &req)
db := common.SdbAlone(xa.Conf)
defer db.Close()
branchID := req.Gid + "-" + req.BranchID
if req.Action == "commit" {
_, err = common.SdbExec(db, fmt.Sprintf("xa commit '%s'", branchID))
} else if req.Action == "rollback" {
_, err = common.SdbExec(db, fmt.Sprintf("xa rollback '%s'", branchID))
} else {
panic(fmt.Errorf("unknown action: %s", req.Action))
}
return M{"dtm_result": "SUCCESS"}, err
}))
register(u.Path, xa)
return xa, nil
}
// HandleCallback 处理commit/rollback的回调
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) {
db := common.SdbAlone(xc.Conf)
defer db.Close()
xaID := gid + "-" + branchID
_, err := common.SdbExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
return M{"dtm_result": "SUCCESS"}, err
}
// XaLocalTransaction start a xa local transaction
func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) {
xa := XaFromReq(c)

View File

@ -19,9 +19,9 @@ func (t *transXaProcessor) GenBranches() []TransBranch {
return []TransBranch{}
}
func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(M{
resp, err := common.RestyClient.R().SetQueryParams(common.MS{
"branch_id": branch.BranchID,
"action": common.If(t.Status == "prepared", "rollback", "commit"),
"action": common.If(t.Status == "prepared", "rollback", "commit").(string),
"gid": branch.Gid,
}).Post(branch.URL)
e2p(err)

View File

@ -13,9 +13,9 @@ func TestXa(t *testing.T) {
if config.DB["driver"] != "mysql" {
return
}
// xaLocalError(t)
xaLocalError(t)
xaNormal(t)
// xaRollback(t)
xaRollback(t)
}
func xaLocalError(t *testing.T) {

View File

@ -16,7 +16,11 @@ func XaSetup(app *gin.Engine) {
app.POST(BusiAPI+"/TransInXa", common.WrapHandler(xaTransIn))
app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(xaTransOut))
var err error
XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, app, Busi+"/xa")
XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) {
app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("action"))
}))
})
e2p(err)
}
@ -24,12 +28,11 @@ func XaSetup(app *gin.Engine) {
func XaFireRequest() string {
gid := dtmcli.MustGenGid(DtmServer)
res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) {
req := &TransReq{Amount: 30}
resp, err := xa.CallBranch(req, Busi+"/TransOutXa")
resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa")
if dtmcli.IsFailure(resp, err) {
return resp, err
}
return xa.CallBranch(req, Busi+"/TransInXa")
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
})
dtmcli.PanicIfFailure(res, err)
return gid