From 336dcf42e90f1b7208f84fa5137fd3075c2d0796 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sun, 1 Aug 2021 15:51:04 +0800 Subject: [PATCH] decouple xa from gin --- dtmcli/types.go | 8 ++++---- dtmcli/xa.go | 40 +++++++++++++++------------------------- dtmsvr/trans_xa.go | 4 ++-- dtmsvr/trans_xa_test.go | 4 ++-- examples/main_xa.go | 11 +++++++---- 5 files changed, 30 insertions(+), 37 deletions(-) diff --git a/dtmcli/types.go b/dtmcli/types.go index 0cd081a..29b8806 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -21,14 +21,14 @@ func MustGenGid(server string) string { // IsFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么返回true。此时认为业务调用失败 func IsFailure(res interface{}, err error) bool { resp, ok := res.(*resty.Response) - return err != nil || ok && (strings.Contains(resp.String(), "FAILURE") || resp.IsError()) + return err != nil || // 包含错误 + ok && (resp.IsError() || strings.Contains(resp.String(), "FAILURE")) || // resp包含failure + !ok && res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") // 结果中包含failure } // PanicIfFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么Panic。此时认为业务调用失败 func PanicIfFailure(res interface{}, err error) { - resp, ok := res.(*resty.Response) - failure := err != nil || ok && (strings.Contains(resp.String(), "FAILURE") || resp.IsError()) - if failure { + if IsFailure(res, err) { panic(fmt.Errorf("dtm failure ret: %v err %v", res, err)) } } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 50bdd81..39f895d 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -21,6 +21,9 @@ type XaGlobalFunc func(xa *Xa) (interface{}, error) // XaLocalFunc type of xa local function type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error) +// XaRegisterCallback type of xa register callback handler +type XaRegisterCallback func(path string, xa *XaClient) + // XaClient xa client type XaClient struct { Server string @@ -43,7 +46,7 @@ func XaFromReq(c *gin.Context) *Xa { } // NewXaClient construct a xa client -func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) (*XaClient, error) { +func NewXaClient(server string, mysqlConf map[string]string, callbackURL string, register XaRegisterCallback) (*XaClient, error) { xa := &XaClient{ Server: server, Conf: mysqlConf, @@ -53,33 +56,20 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca if err != nil { return nil, err } - app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { - type CallbackReq struct { - Gid string `json:"gid"` - BranchID string `json:"branch_id"` - Action string `json:"action"` - } - req := CallbackReq{} - b, err := c.GetRawData() - if err != nil { - return nil, err - } - common.MustUnmarshal(b, &req) - db := common.SdbAlone(xa.Conf) - defer db.Close() - branchID := req.Gid + "-" + req.BranchID - if req.Action == "commit" { - _, err = common.SdbExec(db, fmt.Sprintf("xa commit '%s'", branchID)) - } else if req.Action == "rollback" { - _, err = common.SdbExec(db, fmt.Sprintf("xa rollback '%s'", branchID)) - } else { - panic(fmt.Errorf("unknown action: %s", req.Action)) - } - return M{"dtm_result": "SUCCESS"}, err - })) + register(u.Path, xa) return xa, nil } +// HandleCallback 处理commit/rollback的回调 +func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) { + db := common.SdbAlone(xc.Conf) + defer db.Close() + xaID := gid + "-" + branchID + _, err := common.SdbExec(db, fmt.Sprintf("xa %s '%s'", action, xaID)) + return M{"dtm_result": "SUCCESS"}, err + +} + // XaLocalTransaction start a xa local transaction func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) { xa := XaFromReq(c) diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 835ac73..1e33353 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -19,9 +19,9 @@ func (t *transXaProcessor) GenBranches() []TransBranch { return []TransBranch{} } func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := common.RestyClient.R().SetBody(M{ + resp, err := common.RestyClient.R().SetQueryParams(common.MS{ "branch_id": branch.BranchID, - "action": common.If(t.Status == "prepared", "rollback", "commit"), + "action": common.If(t.Status == "prepared", "rollback", "commit").(string), "gid": branch.Gid, }).Post(branch.URL) e2p(err) diff --git a/dtmsvr/trans_xa_test.go b/dtmsvr/trans_xa_test.go index 132fdf8..846ca39 100644 --- a/dtmsvr/trans_xa_test.go +++ b/dtmsvr/trans_xa_test.go @@ -13,9 +13,9 @@ func TestXa(t *testing.T) { if config.DB["driver"] != "mysql" { return } - // xaLocalError(t) + xaLocalError(t) xaNormal(t) - // xaRollback(t) + xaRollback(t) } func xaLocalError(t *testing.T) { diff --git a/examples/main_xa.go b/examples/main_xa.go index fd7c378..aa0a859 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -16,7 +16,11 @@ func XaSetup(app *gin.Engine) { app.POST(BusiAPI+"/TransInXa", common.WrapHandler(xaTransIn)) app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(xaTransOut)) var err error - XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, app, Busi+"/xa") + XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) { + app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { + return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("action")) + })) + }) e2p(err) } @@ -24,12 +28,11 @@ func XaSetup(app *gin.Engine) { func XaFireRequest() string { gid := dtmcli.MustGenGid(DtmServer) res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) { - req := &TransReq{Amount: 30} - resp, err := xa.CallBranch(req, Busi+"/TransOutXa") + resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa") if dtmcli.IsFailure(resp, err) { return resp, err } - return xa.CallBranch(req, Busi+"/TransInXa") + return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa") }) dtmcli.PanicIfFailure(res, err) return gid