Merge branch 'alpha' into main
This commit is contained in:
commit
5ecc66213c
@ -98,14 +98,14 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re
|
|||||||
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType)
|
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType)
|
||||||
logrus.Printf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
|
logrus.Printf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
|
||||||
if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功
|
if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功
|
||||||
res = common.MS{"dtm_result": "SUCCESS"}
|
res = ResultSuccess
|
||||||
return
|
return
|
||||||
} else if currentAffected == 0 { // 插入不成功
|
} else if currentAffected == 0 { // 插入不成功
|
||||||
var result sql.NullString
|
var result sql.NullString
|
||||||
err := common.StxQueryRow(tx, "select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?",
|
err := common.StxQueryRow(tx, "select result from dtm_barrier.barrier where trans_type=? and gid=? and branch_id=? and branch_type=? and reason=?",
|
||||||
ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType).Scan(&result)
|
ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, ti.BranchType).Scan(&result)
|
||||||
if err == sql.ErrNoRows { // 这个是悬挂操作,返回失败,AP收到这个返回,会尽快回滚
|
if err == sql.ErrNoRows { // 这个是悬挂操作,返回失败,AP收到这个返回,会尽快回滚
|
||||||
res = common.MS{"dtm_result": "FAILURE"}
|
res = ResultFailure
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -117,7 +117,7 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 数据库里没有上次的结果,属于重复空补偿,直接返回成功
|
// 数据库里没有上次的结果,属于重复空补偿,直接返回成功
|
||||||
res = common.MS{"dtm_result": "SUCCESS"}
|
res = ResultSuccess
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
res, rerr = busiCall(tx)
|
res, rerr = busiCall(tx)
|
||||||
|
|||||||
@ -1,8 +1,6 @@
|
|||||||
package dtmcli
|
package dtmcli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
@ -10,7 +8,7 @@ import (
|
|||||||
// Msg reliable msg type
|
// Msg reliable msg type
|
||||||
type Msg struct {
|
type Msg struct {
|
||||||
MsgData
|
MsgData
|
||||||
Server string
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// MsgData msg data
|
// MsgData msg data
|
||||||
@ -34,13 +32,15 @@ func NewMsg(server string, gid string) *Msg {
|
|||||||
Gid: gid,
|
Gid: gid,
|
||||||
TransType: "msg",
|
TransType: "msg",
|
||||||
},
|
},
|
||||||
Server: server,
|
TransBase: TransBase{
|
||||||
|
Dtm: server,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a new step
|
// Add add a new step
|
||||||
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
||||||
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData)
|
logrus.Printf("msg %s Add %s %v", s.MsgData.Gid, action, postData)
|
||||||
step := MsgStep{
|
step := MsgStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Data: common.MustMarshalString(postData),
|
Data: common.MustMarshalString(postData),
|
||||||
@ -49,17 +49,13 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit submit the msg
|
|
||||||
func (s *Msg) Submit() error {
|
|
||||||
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
|
|
||||||
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
|
|
||||||
return CheckDtmResponse(resp, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare prepare the msg
|
// Prepare prepare the msg
|
||||||
func (s *Msg) Prepare(queryPrepared string) error {
|
func (s *Msg) Prepare(queryPrepared string) error {
|
||||||
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
|
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
|
||||||
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData)
|
return s.CallDtm(&s.MsgData, "prepare")
|
||||||
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server))
|
}
|
||||||
return CheckDtmResponse(resp, err)
|
|
||||||
|
// Submit submit the msg
|
||||||
|
func (s *Msg) Submit() error {
|
||||||
|
return s.CallDtm(&s.MsgData, "submit")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,8 +1,6 @@
|
|||||||
package dtmcli
|
package dtmcli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
@ -10,7 +8,7 @@ import (
|
|||||||
// Saga struct of saga
|
// Saga struct of saga
|
||||||
type Saga struct {
|
type Saga struct {
|
||||||
SagaData
|
SagaData
|
||||||
Server string
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// SagaData sage data
|
// SagaData sage data
|
||||||
@ -34,13 +32,15 @@ func NewSaga(server string, gid string) *Saga {
|
|||||||
Gid: gid,
|
Gid: gid,
|
||||||
TransType: "saga",
|
TransType: "saga",
|
||||||
},
|
},
|
||||||
Server: server,
|
TransBase: TransBase{
|
||||||
|
Dtm: server,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a saga step
|
// Add add a saga step
|
||||||
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
||||||
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
logrus.Printf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData)
|
||||||
step := SagaStep{
|
step := SagaStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
@ -52,7 +52,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
|
|||||||
|
|
||||||
// Submit submit the saga trans
|
// Submit submit the saga trans
|
||||||
func (s *Saga) Submit() error {
|
func (s *Saga) Submit() error {
|
||||||
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
|
return s.CallDtm(&s.SagaData, "submit")
|
||||||
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server))
|
|
||||||
return CheckDtmResponse(resp, err)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,53 +10,49 @@ import (
|
|||||||
|
|
||||||
// Tcc struct of tcc
|
// Tcc struct of tcc
|
||||||
type Tcc struct {
|
type Tcc struct {
|
||||||
IDGenerator
|
|
||||||
Dtm string
|
|
||||||
Gid string
|
Gid string
|
||||||
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// TccGlobalFunc type of global tcc call
|
// TccGlobalFunc type of global tcc call
|
||||||
type TccGlobalFunc func(tcc *Tcc) (interface{}, error)
|
type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
|
||||||
|
|
||||||
// TccGlobalTransaction begin a tcc global transaction
|
// TccGlobalTransaction begin a tcc global transaction
|
||||||
// dtm dtm服务器地址
|
// dtm dtm服务器地址
|
||||||
// gid 全局事务id
|
// gid 全局事务id
|
||||||
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
||||||
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret interface{}, rerr error) {
|
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
||||||
data := &M{
|
data := &M{
|
||||||
"gid": gid,
|
"gid": gid,
|
||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
}
|
}
|
||||||
tcc := &Tcc{Dtm: dtm, Gid: gid}
|
tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, Gid: gid}
|
||||||
resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
|
rerr = tcc.CallDtm(data, "prepare")
|
||||||
if IsFailure(resp, err) {
|
if rerr != nil {
|
||||||
return resp, err
|
return rerr
|
||||||
}
|
}
|
||||||
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
||||||
defer func() {
|
defer func() {
|
||||||
var x interface{}
|
x := recover()
|
||||||
if x = recover(); x != nil || IsFailure(ret, rerr) {
|
operation := common.If(x == nil && rerr == nil, "submit", "abort").(string)
|
||||||
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort")
|
err := tcc.CallDtm(data, operation)
|
||||||
} else {
|
if rerr == nil {
|
||||||
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit")
|
rerr = err
|
||||||
}
|
|
||||||
if IsFailure(resp, err) {
|
|
||||||
common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String())
|
|
||||||
}
|
}
|
||||||
if x != nil {
|
if x != nil {
|
||||||
panic(x)
|
panic(x)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
ret, rerr = tccFunc(tcc)
|
resp, rerr := tccFunc(tcc)
|
||||||
|
rerr = CheckResponse(resp, rerr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TccFromReq tcc from request info
|
// TccFromReq tcc from request info
|
||||||
func TccFromReq(c *gin.Context) (*Tcc, error) {
|
func TccFromReq(c *gin.Context) (*Tcc, error) {
|
||||||
tcc := &Tcc{
|
tcc := &Tcc{
|
||||||
Dtm: c.Query("dtm"),
|
TransBase: *TransBaseFromReq(c),
|
||||||
Gid: c.Query("gid"),
|
Gid: c.Query("gid"),
|
||||||
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
|
|
||||||
}
|
}
|
||||||
if tcc.Dtm == "" || tcc.Gid == "" {
|
if tcc.Dtm == "" || tcc.Gid == "" {
|
||||||
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid)
|
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid)
|
||||||
@ -68,8 +64,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
|
|||||||
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
|
// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果
|
||||||
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
|
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
|
||||||
branchID := t.NewBranchID()
|
branchID := t.NewBranchID()
|
||||||
resp, err := common.RestyClient.R().
|
err := t.CallDtm(&M{
|
||||||
SetBody(&M{
|
|
||||||
"gid": t.Gid,
|
"gid": t.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
@ -78,12 +73,11 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
|
|||||||
"try": tryURL,
|
"try": tryURL,
|
||||||
"confirm": confirmURL,
|
"confirm": confirmURL,
|
||||||
"cancel": cancelURL,
|
"cancel": cancelURL,
|
||||||
}).
|
}, "registerTccBranch")
|
||||||
Post(t.Dtm + "/registerTccBranch")
|
if err != nil {
|
||||||
if IsFailure(resp, err) {
|
return nil, err
|
||||||
return resp, err
|
|
||||||
}
|
}
|
||||||
return common.RestyClient.R().
|
resp, err := common.RestyClient.R().
|
||||||
SetBody(body).
|
SetBody(body).
|
||||||
SetQueryParams(common.MS{
|
SetQueryParams(common.MS{
|
||||||
"dtm": t.Dtm,
|
"dtm": t.Dtm,
|
||||||
@ -93,4 +87,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
|
|||||||
"branch_type": "try",
|
"branch_type": "try",
|
||||||
}).
|
}).
|
||||||
Post(tryURL)
|
Post(tryURL)
|
||||||
|
return resp, CheckResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +1,11 @@
|
|||||||
package dtmcli
|
package dtmcli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
@ -18,30 +20,28 @@ func MustGenGid(server string) string {
|
|||||||
return res["gid"]
|
return res["gid"]
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么返回true。此时认为业务调用失败
|
// CheckResponse 检查Response,返回错误
|
||||||
func IsFailure(res interface{}, err error) bool {
|
func CheckResponse(resp *resty.Response, err error) error {
|
||||||
resp, ok := res.(*resty.Response)
|
if err == nil && resp != nil {
|
||||||
return err != nil || // 包含错误
|
if resp.IsError() {
|
||||||
ok && (resp.IsError() || strings.Contains(resp.String(), "FAILURE")) || // resp包含failure
|
return errors.New(resp.String())
|
||||||
!ok && res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") // 结果中包含failure
|
} else if strings.Contains(resp.String(), "FAILURE") {
|
||||||
}
|
return ErrFailure
|
||||||
|
}
|
||||||
// PanicIfFailure 如果err非空,或者ret是http的响应且包含FAILURE,那么Panic。此时认为业务调用失败
|
|
||||||
func PanicIfFailure(res interface{}, err error) {
|
|
||||||
if IsFailure(res, err) {
|
|
||||||
panic(fmt.Errorf("dtm failure ret: %v err %v", res, err))
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// CheckDtmResponse check the response of dtm, if not ok ,generate error
|
|
||||||
func CheckDtmResponse(resp *resty.Response, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckResult 检查Result,返回错误
|
||||||
|
func CheckResult(res interface{}, err error) error {
|
||||||
|
resp, ok := res.(*resty.Response)
|
||||||
|
if ok {
|
||||||
|
return CheckResponse(resp, err)
|
||||||
}
|
}
|
||||||
if !strings.Contains(resp.String(), "SUCCESS") || resp.IsError() {
|
if res != nil && strings.Contains(common.MustMarshalString(res), "FAILURE") {
|
||||||
return fmt.Errorf("dtm response failed: %s", resp.String())
|
return ErrFailure
|
||||||
}
|
}
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// IDGenerator used to generate a branch id
|
// IDGenerator used to generate a branch id
|
||||||
@ -61,3 +61,52 @@ func (g *IDGenerator) NewBranchID() string {
|
|||||||
g.branchID = g.branchID + 1
|
g.branchID = g.branchID + 1
|
||||||
return g.parentID + fmt.Sprintf("%02d", g.branchID)
|
return g.parentID + fmt.Sprintf("%02d", g.branchID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TransResult dtm 返回的结果
|
||||||
|
type TransResult struct {
|
||||||
|
DtmResult string `json:"dtm_result"`
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransBase 事务的基础类
|
||||||
|
type TransBase struct {
|
||||||
|
IDGenerator
|
||||||
|
Dtm string
|
||||||
|
// WaitResult 是否等待全局事务的最终结果
|
||||||
|
WaitResult bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransBaseFromReq construct xa info from request
|
||||||
|
func TransBaseFromReq(c *gin.Context) *TransBase {
|
||||||
|
return &TransBase{
|
||||||
|
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
|
||||||
|
Dtm: c.Query("dtm"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CallDtm 调用dtm服务器,返回事务的状态
|
||||||
|
func (tb *TransBase) CallDtm(body interface{}, operation string) error {
|
||||||
|
params := common.MS{}
|
||||||
|
if tb.WaitResult {
|
||||||
|
params["wait_result"] = "1"
|
||||||
|
}
|
||||||
|
resp, err := common.RestyClient.R().SetQueryParams(params).
|
||||||
|
SetResult(&TransResult{}).SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tr := resp.Result().(*TransResult)
|
||||||
|
if tr.DtmResult == "FAILURE" {
|
||||||
|
return errors.New("FAILURE: " + tr.Message)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrFailure 表示返回失败,要求回滚
|
||||||
|
var ErrFailure = errors.New("transaction FAILURE")
|
||||||
|
|
||||||
|
// ResultSuccess 表示返回成功,可以进行下一步
|
||||||
|
var ResultSuccess = common.M{"dtm_result": "SUCCESS"}
|
||||||
|
|
||||||
|
// ResultFailure 表示返回失败,要求回滚
|
||||||
|
var ResultFailure = common.M{"dtm_result": "FAILURE"}
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
package dtmcli
|
package dtmcli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -22,8 +21,4 @@ func TestTypes(t *testing.T) {
|
|||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
_, err = TransInfoFromQuery(url.Values{})
|
_, err = TransInfoFromQuery(url.Values{})
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
|
||||||
err2 := fmt.Errorf("an error")
|
|
||||||
err3 := CheckDtmResponse(nil, err2)
|
|
||||||
assert.Error(t, err2, err3)
|
|
||||||
}
|
}
|
||||||
|
|||||||
61
dtmcli/xa.go
61
dtmcli/xa.go
@ -16,7 +16,7 @@ type M = map[string]interface{}
|
|||||||
var e2p = common.E2P
|
var e2p = common.E2P
|
||||||
|
|
||||||
// XaGlobalFunc type of xa global function
|
// XaGlobalFunc type of xa global function
|
||||||
type XaGlobalFunc func(xa *Xa) (interface{}, error)
|
type XaGlobalFunc func(xa *Xa) (*resty.Response, error)
|
||||||
|
|
||||||
// XaLocalFunc type of xa local function
|
// XaLocalFunc type of xa local function
|
||||||
type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error)
|
type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error)
|
||||||
@ -33,16 +33,13 @@ type XaClient struct {
|
|||||||
|
|
||||||
// Xa xa transaction
|
// Xa xa transaction
|
||||||
type Xa struct {
|
type Xa struct {
|
||||||
IDGenerator
|
|
||||||
Gid string
|
Gid string
|
||||||
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaFromReq construct xa info from request
|
// XaFromReq construct xa info from request
|
||||||
func XaFromReq(c *gin.Context) *Xa {
|
func XaFromReq(c *gin.Context) *Xa {
|
||||||
return &Xa{
|
return &Xa{TransBase: *TransBaseFromReq(c), Gid: c.Query("gid")}
|
||||||
Gid: c.Query("gid"),
|
|
||||||
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewXaClient construct a xa client
|
// NewXaClient construct a xa client
|
||||||
@ -66,29 +63,26 @@ func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (
|
|||||||
defer db.Close()
|
defer db.Close()
|
||||||
xaID := gid + "-" + branchID
|
xaID := gid + "-" + branchID
|
||||||
_, err := common.SdbExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
|
_, err := common.SdbExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
|
||||||
return M{"dtm_result": "SUCCESS"}, err
|
return ResultSuccess, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaLocalTransaction start a xa local transaction
|
// XaLocalTransaction start a xa local transaction
|
||||||
func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) {
|
func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret interface{}, rerr error) {
|
||||||
xa := XaFromReq(c)
|
xa := XaFromReq(c)
|
||||||
|
xa.Dtm = xc.Server
|
||||||
branchID := xa.NewBranchID()
|
branchID := xa.NewBranchID()
|
||||||
xaBranch := xa.Gid + "-" + branchID
|
xaBranch := xa.Gid + "-" + branchID
|
||||||
db := common.SdbAlone(xc.Conf)
|
db := common.SdbAlone(xc.Conf)
|
||||||
defer func() { db.Close() }()
|
defer func() { db.Close() }()
|
||||||
defer func() {
|
defer func() {
|
||||||
var x interface{}
|
x := recover()
|
||||||
_, err := common.SdbExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
|
_, err := common.SdbExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
|
||||||
if err != nil {
|
if x == nil && rerr == nil && err == nil {
|
||||||
common.RedLogf("sql db exec error: %v", err)
|
|
||||||
}
|
|
||||||
if x = recover(); x != nil || IsFailure(ret, rerr) {
|
|
||||||
} else {
|
|
||||||
_, err = common.SdbExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
|
_, err = common.SdbExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if rerr == nil {
|
||||||
common.RedLogf("sql db exec error: %v", err)
|
rerr = err
|
||||||
}
|
}
|
||||||
if x != nil {
|
if x != nil {
|
||||||
panic(x)
|
panic(x)
|
||||||
@ -99,49 +93,47 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, xaFunc XaLocalFunc) (ret
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
ret, rerr = xaFunc(db, xa)
|
ret, rerr = xaFunc(db, xa)
|
||||||
if IsFailure(ret, rerr) {
|
rerr = CheckResult(ret, rerr)
|
||||||
|
if rerr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ret, rerr = common.RestyClient.R().
|
rerr = xa.CallDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}, "registerXaBranch")
|
||||||
SetBody(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}).
|
|
||||||
Post(xc.Server + "/registerXaBranch")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaGlobalTransaction start a xa global transaction
|
// XaGlobalTransaction start a xa global transaction
|
||||||
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (ret interface{}, rerr error) {
|
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
|
||||||
xa := Xa{IDGenerator: IDGenerator{}, Gid: gid}
|
xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, Gid: gid}
|
||||||
data := &M{
|
data := &M{
|
||||||
"gid": gid,
|
"gid": gid,
|
||||||
"trans_type": "xa",
|
"trans_type": "xa",
|
||||||
}
|
}
|
||||||
resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare")
|
rerr = xa.CallDtm(data, "prepare")
|
||||||
if IsFailure(resp, err) {
|
if rerr != nil {
|
||||||
return resp, err
|
return
|
||||||
}
|
}
|
||||||
|
var resp *resty.Response
|
||||||
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
// 小概率情况下,prepare成功了,但是由于网络状况导致上面Failure,那么不执行下面defer的内容,等待超时后再回滚标记事务失败,也没有问题
|
||||||
defer func() {
|
defer func() {
|
||||||
var x interface{}
|
x := recover()
|
||||||
if x = recover(); x != nil || IsFailure(ret, rerr) {
|
operation := common.If(x != nil || rerr != nil, "abort", "submit").(string)
|
||||||
resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort")
|
err := xa.CallDtm(data, operation)
|
||||||
} else {
|
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
||||||
resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit")
|
rerr = err
|
||||||
}
|
|
||||||
if IsFailure(resp, err) {
|
|
||||||
common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String())
|
|
||||||
}
|
}
|
||||||
if x != nil {
|
if x != nil {
|
||||||
panic(x)
|
panic(x)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
ret, rerr = xaFunc(&xa)
|
resp, rerr = xaFunc(&xa)
|
||||||
|
rerr = CheckResponse(resp, rerr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallBranch call a xa branch
|
// CallBranch call a xa branch
|
||||||
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
||||||
branchID := x.NewBranchID()
|
branchID := x.NewBranchID()
|
||||||
return common.RestyClient.R().
|
resp, err := common.RestyClient.R().
|
||||||
SetBody(body).
|
SetBody(body).
|
||||||
SetQueryParams(common.MS{
|
SetQueryParams(common.MS{
|
||||||
"gid": x.Gid,
|
"gid": x.Gid,
|
||||||
@ -150,4 +142,5 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
|||||||
"branch_type": "action",
|
"branch_type": "action",
|
||||||
}).
|
}).
|
||||||
Post(url)
|
Post(url)
|
||||||
|
return resp, CheckResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtmcli"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/clause"
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
@ -28,7 +29,7 @@ func prepare(c *gin.Context) (interface{}, error) {
|
|||||||
t := TransFromContext(c)
|
t := TransFromContext(c)
|
||||||
t.Status = "prepared"
|
t.Status = "prepared"
|
||||||
t.saveNew(dbGet())
|
t.saveNew(dbGet())
|
||||||
return M{"dtm_result": "SUCCESS"}, nil
|
return dtmcli.ResultSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func submit(c *gin.Context) (interface{}, error) {
|
func submit(c *gin.Context) (interface{}, error) {
|
||||||
@ -40,8 +41,7 @@ func submit(c *gin.Context) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
t.Status = "submitted"
|
t.Status = "submitted"
|
||||||
t.saveNew(db)
|
t.saveNew(db)
|
||||||
go t.Process(db)
|
return t.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil
|
||||||
return M{"dtm_result": "SUCCESS"}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func abort(c *gin.Context) (interface{}, error) {
|
func abort(c *gin.Context) (interface{}, error) {
|
||||||
@ -51,8 +51,7 @@ func abort(c *gin.Context) (interface{}, error) {
|
|||||||
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" {
|
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" {
|
||||||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
|
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
|
||||||
}
|
}
|
||||||
go dbt.Process(db)
|
return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil
|
||||||
return M{"dtm_result": "SUCCESS"}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerXaBranch(c *gin.Context) (interface{}, error) {
|
func registerXaBranch(c *gin.Context) (interface{}, error) {
|
||||||
@ -73,7 +72,7 @@ func registerXaBranch(c *gin.Context) (interface{}, error) {
|
|||||||
e2p(err)
|
e2p(err)
|
||||||
global := TransGlobal{Gid: branch.Gid}
|
global := TransGlobal{Gid: branch.Gid}
|
||||||
global.touch(db, config.TransCronInterval)
|
global.touch(db, config.TransCronInterval)
|
||||||
return M{"dtm_result": "SUCCESS"}, nil
|
return dtmcli.ResultSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerTccBranch(c *gin.Context) (interface{}, error) {
|
func registerTccBranch(c *gin.Context) (interface{}, error) {
|
||||||
@ -104,7 +103,7 @@ func registerTccBranch(c *gin.Context) (interface{}, error) {
|
|||||||
e2p(err)
|
e2p(err)
|
||||||
global := TransGlobal{Gid: branch.Gid}
|
global := TransGlobal{Gid: branch.Gid}
|
||||||
global.touch(dbGet(), config.TransCronInterval)
|
global.touch(dbGet(), config.TransCronInterval)
|
||||||
return M{"dtm_result": "SUCCESS"}, nil
|
return dtmcli.ResultSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func query(c *gin.Context) (interface{}, error) {
|
func query(c *gin.Context) (interface{}, error) {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package dtmsvr
|
package dtmsvr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
@ -12,7 +13,7 @@ import (
|
|||||||
|
|
||||||
// CronTransOnce cron expired trans. use expireIn as expire time
|
// CronTransOnce cron expired trans. use expireIn as expire time
|
||||||
func CronTransOnce(expireIn time.Duration) bool {
|
func CronTransOnce(expireIn time.Duration) bool {
|
||||||
defer handlePanic()
|
defer handlePanic(nil)
|
||||||
trans := lockOneTrans(expireIn)
|
trans := lockOneTrans(expireIn)
|
||||||
if trans == nil {
|
if trans == nil {
|
||||||
return false
|
return false
|
||||||
@ -20,7 +21,7 @@ func CronTransOnce(expireIn time.Duration) bool {
|
|||||||
if TransProcessedTestChan != nil {
|
if TransProcessedTestChan != nil {
|
||||||
defer WaitTransProcessed(trans.Gid)
|
defer WaitTransProcessed(trans.Gid)
|
||||||
}
|
}
|
||||||
trans.Process(dbGet())
|
trans.Process(dbGet(), true)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,9 +53,12 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
|
|||||||
return &trans
|
return &trans
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlePanic() {
|
func handlePanic(perr *error) {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
common.RedLogf("----panic %s handlered\n%s", err.(error).Error(), string(debug.Stack()))
|
common.RedLogf("----panic %v handlered\n%s", err, string(debug.Stack()))
|
||||||
|
if perr != nil {
|
||||||
|
*perr = fmt.Errorf("dtm panic: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtmcli"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/clause"
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
@ -110,8 +111,24 @@ func (t *TransGlobal) getProcessor() transProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process process global transaction once
|
// Process process global transaction once
|
||||||
func (t *TransGlobal) Process(db *common.DB) {
|
func (t *TransGlobal) Process(db *common.DB, waitResult bool) common.M {
|
||||||
defer handlePanic()
|
if !waitResult {
|
||||||
|
go t.processInner(db)
|
||||||
|
return dtmcli.ResultSuccess
|
||||||
|
}
|
||||||
|
submitting := t.Status == "submitted"
|
||||||
|
err := t.processInner(db)
|
||||||
|
if err != nil {
|
||||||
|
return common.M{"dtm_result": "FAILURE", "message": err.Error()}
|
||||||
|
}
|
||||||
|
if submitting && t.Status != "succeed" {
|
||||||
|
return common.M{"dtm_result": "FAILURE", "message": "trans failed by user"}
|
||||||
|
}
|
||||||
|
return dtmcli.ResultSuccess
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
|
||||||
|
defer handlePanic(&rerr)
|
||||||
defer func() {
|
defer func() {
|
||||||
if TransProcessedTestChan != nil {
|
if TransProcessedTestChan != nil {
|
||||||
logrus.Printf("processed: %s", t.Gid)
|
logrus.Printf("processed: %s", t.Gid)
|
||||||
@ -126,6 +143,7 @@ func (t *TransGlobal) Process(db *common.DB) {
|
|||||||
branches := []TransBranch{}
|
branches := []TransBranch{}
|
||||||
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
|
db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches)
|
||||||
t.getProcessor().ProcessOnce(db, branches)
|
t.getProcessor().ProcessOnce(db, branches)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS {
|
func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS {
|
||||||
|
|||||||
50
dtmsvr/trans_saga_wait_test.go
Normal file
50
dtmsvr/trans_saga_wait_test.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package dtmsvr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/yedf/dtm/examples"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSagaWait(t *testing.T) {
|
||||||
|
|
||||||
|
sagaNormalWait(t)
|
||||||
|
sagaCommittedPendingWait(t)
|
||||||
|
sagaRollbackWait(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sagaNormalWait(t *testing.T) {
|
||||||
|
saga := genSaga("gid-noramlSagaWait", false, false)
|
||||||
|
saga.WaitResult = true
|
||||||
|
err := saga.Submit()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
WaitTransProcessed(saga.Gid)
|
||||||
|
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
|
||||||
|
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
|
||||||
|
transQuery(t, saga.Gid)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sagaCommittedPendingWait(t *testing.T) {
|
||||||
|
saga := genSaga("gid-committedPendingWait", false, false)
|
||||||
|
examples.MainSwitch.TransOutResult.SetOnce("PENDING")
|
||||||
|
saga.WaitResult = true
|
||||||
|
err := saga.Submit()
|
||||||
|
assert.Error(t, err)
|
||||||
|
WaitTransProcessed(saga.Gid)
|
||||||
|
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
|
||||||
|
CronTransOnce(60 * time.Second)
|
||||||
|
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
|
||||||
|
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
|
||||||
|
}
|
||||||
|
|
||||||
|
func sagaRollbackWait(t *testing.T) {
|
||||||
|
saga := genSaga("gid-rollbackSaga2Wait", false, true)
|
||||||
|
saga.WaitResult = true
|
||||||
|
err := saga.Submit()
|
||||||
|
assert.Error(t, err)
|
||||||
|
WaitTransProcessed(saga.Gid)
|
||||||
|
assert.Equal(t, "failed", getTransStatus(saga.Gid))
|
||||||
|
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
|
||||||
|
}
|
||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
@ -23,24 +24,24 @@ func TestTccBarrier(t *testing.T) {
|
|||||||
|
|
||||||
func tccBarrierRollback(t *testing.T) {
|
func tccBarrierRollback(t *testing.T) {
|
||||||
gid := "tccBarrierRollback"
|
gid := "tccBarrierRollback"
|
||||||
resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
_, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
||||||
assert.True(t, !dtmcli.IsFailure(resp, err))
|
assert.Nil(t, err)
|
||||||
return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
||||||
})
|
})
|
||||||
assert.True(t, dtmcli.IsFailure(resp, err))
|
assert.Error(t, err)
|
||||||
WaitTransProcessed(gid)
|
WaitTransProcessed(gid)
|
||||||
assert.Equal(t, "failed", getTransStatus(gid))
|
assert.Equal(t, "failed", getTransStatus(gid))
|
||||||
}
|
}
|
||||||
|
|
||||||
func tccBarrierNormal(t *testing.T) {
|
func tccBarrierNormal(t *testing.T) {
|
||||||
gid := "tccBarrierNormal"
|
gid := "tccBarrierNormal"
|
||||||
resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
_, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
||||||
assert.True(t, !dtmcli.IsFailure(resp, err))
|
assert.Nil(t, err)
|
||||||
return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
||||||
})
|
})
|
||||||
assert.True(t, !dtmcli.IsFailure(resp, err))
|
assert.Nil(t, err)
|
||||||
WaitTransProcessed(gid)
|
WaitTransProcessed(gid)
|
||||||
assert.Equal(t, "succeed", getTransStatus(gid))
|
assert.Equal(t, "succeed", getTransStatus(gid))
|
||||||
}
|
}
|
||||||
@ -49,7 +50,7 @@ func tccBarrierDisorder(t *testing.T) {
|
|||||||
timeoutChan := make(chan string, 2)
|
timeoutChan := make(chan string, 2)
|
||||||
finishedChan := make(chan string, 2)
|
finishedChan := make(chan string, 2)
|
||||||
gid := "tccBarrierDisorder"
|
gid := "tccBarrierDisorder"
|
||||||
_, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
body := &examples.TransReq{Amount: 30}
|
body := &examples.TransReq{Amount: 30}
|
||||||
tryURL := Busi + "/TccBTransOutTry"
|
tryURL := Busi + "/TccBTransOutTry"
|
||||||
confirmURL := Busi + "/TccBTransOutConfirm"
|
confirmURL := Busi + "/TccBTransOutConfirm"
|
||||||
@ -68,8 +69,7 @@ func tccBarrierDisorder(t *testing.T) {
|
|||||||
return res, err
|
return res, err
|
||||||
}))
|
}))
|
||||||
// 注册子事务
|
// 注册子事务
|
||||||
r, err := common.RestyClient.R().
|
err := tcc.CallDtm(M{
|
||||||
SetBody(&M{
|
|
||||||
"gid": tcc.Gid,
|
"gid": tcc.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
@ -78,13 +78,12 @@ func tccBarrierDisorder(t *testing.T) {
|
|||||||
"try": tryURL,
|
"try": tryURL,
|
||||||
"confirm": confirmURL,
|
"confirm": confirmURL,
|
||||||
"cancel": cancelURL,
|
"cancel": cancelURL,
|
||||||
}).
|
}, "registerTccBranch")
|
||||||
Post(tcc.Dtm + "/registerTccBranch")
|
assert.Nil(t, err)
|
||||||
assert.True(t, !dtmcli.IsFailure(r, err))
|
|
||||||
go func() {
|
go func() {
|
||||||
logrus.Printf("sleeping to wait for tcc try timeout")
|
logrus.Printf("sleeping to wait for tcc try timeout")
|
||||||
<-timeoutChan
|
<-timeoutChan
|
||||||
r, _ = common.RestyClient.R().
|
r, _ := common.RestyClient.R().
|
||||||
SetBody(body).
|
SetBody(body).
|
||||||
SetQueryParams(common.MS{
|
SetQueryParams(common.MS{
|
||||||
"dtm": tcc.Dtm,
|
"dtm": tcc.Dtm,
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
"github.com/yedf/dtm/examples"
|
"github.com/yedf/dtm/examples"
|
||||||
@ -18,26 +19,24 @@ func TestTcc(t *testing.T) {
|
|||||||
func tccNormal(t *testing.T) {
|
func tccNormal(t *testing.T) {
|
||||||
data := &examples.TransReq{Amount: 30}
|
data := &examples.TransReq{Amount: 30}
|
||||||
gid := "tccNormal"
|
gid := "tccNormal"
|
||||||
ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
_, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
assert.Nil(t, err)
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(ret, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func tccRollback(t *testing.T) {
|
func tccRollback(t *testing.T) {
|
||||||
gid := "tccRollback"
|
gid := "tccRollback"
|
||||||
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
|
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
|
||||||
resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
_, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
||||||
assert.True(t, !dtmcli.IsFailure(resp, rerr))
|
assert.Nil(t, rerr)
|
||||||
examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING")
|
examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING")
|
||||||
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
assert.True(t, dtmcli.IsFailure(resp, err))
|
assert.Error(t, err)
|
||||||
WaitTransProcessed(gid)
|
WaitTransProcessed(gid)
|
||||||
assert.Equal(t, "aborting", getTransStatus(gid))
|
assert.Equal(t, "aborting", getTransStatus(gid))
|
||||||
CronTransOnce(60 * time.Second)
|
CronTransOnce(60 * time.Second)
|
||||||
|
|||||||
@ -21,7 +21,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch {
|
|||||||
func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
|
func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
|
||||||
resp, err := common.RestyClient.R().SetQueryParams(common.MS{
|
resp, err := common.RestyClient.R().SetQueryParams(common.MS{
|
||||||
"branch_id": branch.BranchID,
|
"branch_id": branch.BranchID,
|
||||||
"action": common.If(t.Status == "prepared", "rollback", "commit").(string),
|
"action": common.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string),
|
||||||
"gid": branch.Gid,
|
"gid": branch.Gid,
|
||||||
}).Post(branch.URL)
|
}).Post(branch.URL)
|
||||||
e2p(err)
|
e2p(err)
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
"github.com/yedf/dtm/examples"
|
"github.com/yedf/dtm/examples"
|
||||||
@ -19,7 +20,8 @@ func TestXa(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func xaLocalError(t *testing.T) {
|
func xaLocalError(t *testing.T) {
|
||||||
_, err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (interface{}, error) {
|
xc := examples.XaClient
|
||||||
|
err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) {
|
||||||
return nil, fmt.Errorf("an error")
|
return nil, fmt.Errorf("an error")
|
||||||
})
|
})
|
||||||
assert.Error(t, err, fmt.Errorf("an error"))
|
assert.Error(t, err, fmt.Errorf("an error"))
|
||||||
@ -28,15 +30,15 @@ func xaLocalError(t *testing.T) {
|
|||||||
func xaNormal(t *testing.T) {
|
func xaNormal(t *testing.T) {
|
||||||
xc := examples.XaClient
|
xc := examples.XaClient
|
||||||
gid := "xaNormal"
|
gid := "xaNormal"
|
||||||
res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) {
|
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
|
||||||
req := examples.GenTransReq(30, false, false)
|
req := examples.GenTransReq(30, false, false)
|
||||||
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return xa.CallBranch(req, examples.Busi+"/TransInXa")
|
return xa.CallBranch(req, examples.Busi+"/TransInXa")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(res, err)
|
assert.Equal(t, nil, err)
|
||||||
WaitTransProcessed(gid)
|
WaitTransProcessed(gid)
|
||||||
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
|
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
|
||||||
}
|
}
|
||||||
@ -44,15 +46,15 @@ func xaNormal(t *testing.T) {
|
|||||||
func xaRollback(t *testing.T) {
|
func xaRollback(t *testing.T) {
|
||||||
xc := examples.XaClient
|
xc := examples.XaClient
|
||||||
gid := "xaRollback"
|
gid := "xaRollback"
|
||||||
res, err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) {
|
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
|
||||||
req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
|
req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
|
||||||
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return xa.CallBranch(req, examples.Busi+"/TransInXa")
|
return xa.CallBranch(req, examples.Busi+"/TransInXa")
|
||||||
})
|
})
|
||||||
assert.True(t, dtmcli.IsFailure(res, err))
|
assert.Error(t, err)
|
||||||
WaitTransProcessed(gid)
|
WaitTransProcessed(gid)
|
||||||
assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
|
assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
|
||||||
assert.Equal(t, "failed", getTransStatus(gid))
|
assert.Equal(t, "failed", getTransStatus(gid))
|
||||||
|
|||||||
@ -33,7 +33,7 @@ func SagaBarrierAddRoute(app *gin.Engine) {
|
|||||||
|
|
||||||
func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {
|
func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {
|
||||||
_, err := common.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
|
_, err := common.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
|
||||||
return common.MS{"dtm_result": "SUCCESS"}, err
|
return dtmcli.ResultSuccess, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package examples
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
@ -20,14 +21,14 @@ func TccSetup(app *gin.Engine) {
|
|||||||
// TccFireRequestNested 1
|
// TccFireRequestNested 1
|
||||||
func TccFireRequestNested() string {
|
func TccFireRequestNested() string {
|
||||||
gid := dtmcli.MustGenGid(DtmServer)
|
gid := dtmcli.MustGenGid(DtmServer)
|
||||||
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(ret, err)
|
e2p(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,13 +36,13 @@ func TccFireRequestNested() string {
|
|||||||
func TccFireRequest() string {
|
func TccFireRequest() string {
|
||||||
logrus.Printf("tcc simple transaction begin")
|
logrus.Printf("tcc simple transaction begin")
|
||||||
gid := dtmcli.MustGenGid(DtmServer)
|
gid := dtmcli.MustGenGid(DtmServer)
|
||||||
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(ret, err)
|
e2p(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
@ -14,14 +15,14 @@ import (
|
|||||||
func TccBarrierFireRequest() string {
|
func TccBarrierFireRequest() string {
|
||||||
logrus.Printf("tcc transaction begin")
|
logrus.Printf("tcc transaction begin")
|
||||||
gid := dtmcli.MustGenGid(DtmServer)
|
gid := dtmcli.MustGenGid(DtmServer)
|
||||||
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
|
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
|
||||||
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(ret, err)
|
e2p(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ func adjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {
|
|||||||
if err == nil && affected == 0 {
|
if err == nil && affected == 0 {
|
||||||
return nil, fmt.Errorf("update 0 rows")
|
return nil, fmt.Errorf("update 0 rows")
|
||||||
}
|
}
|
||||||
return common.MS{"dtm_result": "SUCCESS"}, err
|
return dtmcli.ResultSuccess, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TCC下,转入
|
// TCC下,转入
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
)
|
)
|
||||||
@ -27,33 +28,33 @@ func XaSetup(app *gin.Engine) {
|
|||||||
// XaFireRequest 注册全局XA事务,调用XA的分支
|
// XaFireRequest 注册全局XA事务,调用XA的分支
|
||||||
func XaFireRequest() string {
|
func XaFireRequest() string {
|
||||||
gid := dtmcli.MustGenGid(DtmServer)
|
gid := dtmcli.MustGenGid(DtmServer)
|
||||||
res, err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (interface{}, error) {
|
err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
|
||||||
resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa")
|
resp, err := xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOutXa")
|
||||||
if dtmcli.IsFailure(resp, err) {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
|
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
|
||||||
})
|
})
|
||||||
dtmcli.PanicIfFailure(res, err)
|
e2p(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|
||||||
func xaTransIn(c *gin.Context) (interface{}, error) {
|
func xaTransIn(c *gin.Context) (interface{}, error) {
|
||||||
return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
|
return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
|
||||||
if reqFrom(c).TransInResult == "FAILURE" {
|
if reqFrom(c).TransInResult == "FAILURE" {
|
||||||
return M{"dtm_result": "FAILURE"}, nil
|
return dtmcli.ResultFailure, nil
|
||||||
}
|
}
|
||||||
_, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2)
|
_, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2)
|
||||||
return M{"dtm_result": "SUCCESS"}, err
|
return dtmcli.ResultSuccess, err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func xaTransOut(c *gin.Context) (interface{}, error) {
|
func xaTransOut(c *gin.Context) (interface{}, error) {
|
||||||
return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
|
return XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
|
||||||
if reqFrom(c).TransOutResult == "FAILURE" {
|
if reqFrom(c).TransOutResult == "FAILURE" {
|
||||||
return M{"dtm_result": "FAILURE"}, nil
|
return dtmcli.ResultFailure, nil
|
||||||
}
|
}
|
||||||
_, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
|
_, err := common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
|
||||||
return M{"dtm_result": "SUCCESS"}, err
|
return dtmcli.ResultSuccess, err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -44,7 +44,7 @@ func QsFireRequest() string {
|
|||||||
|
|
||||||
func qsAdjustBalance(uid int, amount int) (interface{}, error) {
|
func qsAdjustBalance(uid int, amount int) (interface{}, error) {
|
||||||
_, err := common.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
|
_, err := common.SdbExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
|
||||||
return M{"dtm_result": "SUCCESS"}, err
|
return dtmcli.ResultSuccess, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func qsAddRoute(app *gin.Engine) {
|
func qsAddRoute(app *gin.Engine) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user