tcc interface seems ok

This commit is contained in:
yedf2 2021-08-01 11:10:16 +08:00
parent 486255034d
commit 94e8e4eff9
11 changed files with 96 additions and 99 deletions

View File

@ -162,7 +162,7 @@ func SdbExec(db *sql.DB, sql string, values ...interface{}) (affected int64, rer
affected, rerr = r.RowsAffected()
logrus.Printf("affected: %d for %s %v", affected, sql, values)
} else {
logrus.Printf("\x1b[31m\nexec error: %v for %s %v\x1b[0m\n", rerr, sql, values)
RedLogf("exec error: %v for %s %v", rerr, sql, values)
}
return
}
@ -174,7 +174,7 @@ func StxExec(tx *sql.Tx, sql string, values ...interface{}) (affected int64, rer
affected, rerr = r.RowsAffected()
logrus.Printf("affected: %d for %s %v", affected, sql, values)
} else {
logrus.Printf("\x1b[31m\nexec error: %v for %s %v\x1b[0m\n", rerr, sql, values)
RedLogf("exec error: %v for %s %v", rerr, sql, values)
}
return
}

View File

@ -205,6 +205,11 @@ func (f *formatter) Format(entry *logrus.Entry) ([]byte, error) {
return b.Bytes(), nil
}
// RedLogf 采用红色打印错误类信息
func RedLogf(fmt string, args ...interface{}) {
logrus.Errorf("\x1b[31m\n"+fmt+"\x1b[0m\n", args...)
}
// InitConfig init config
func InitConfig(dir string, config interface{}) {
logrus.SetFormatter(&formatter{})

View File

@ -2,11 +2,9 @@ package dtmcli
import (
"fmt"
"strings"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
@ -18,41 +16,38 @@ type Tcc struct {
}
// TccGlobalFunc type of global tcc call
type TccGlobalFunc func(tcc *Tcc) error
type TccGlobalFunc func(tcc *Tcc) (interface{}, error)
// TccGlobalTransaction begin a tcc global transaction
// dtm dtm服务器地址
// gid 全局事务id
// tccFunc tcc事务函数里面会定义全局事务的分支
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (ret interface{}, rerr error) {
data := &M{
"gid": gid,
"trans_type": "tcc",
}
tcc := &Tcc{Dtm: dtm, Gid: gid}
resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
if IsFailure(resp, err) {
return resp, err
}
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题
defer func() {
var resp *resty.Response
var err error
var x interface{}
if x = recover(); x != nil || rerr != nil {
if x = recover(); x != nil || IsFailure(ret, rerr) {
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort")
} else {
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit")
}
err2 := CheckDtmResponse(resp, err)
if err2 != nil {
logrus.Errorf("submitting or abort global transaction error: %v", err2)
if IsFailure(resp, err) {
common.RedLogf("submitting or abort global transaction error: %v resp: %s", err, resp.String())
}
if x != nil {
panic(x)
}
}()
tcc := &Tcc{Dtm: dtm, Gid: gid}
resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
rerr = CheckDtmResponse(resp, err)
if rerr != nil {
return
}
rerr = tccFunc(tcc)
ret, rerr = tccFunc(tcc)
return
}
@ -85,8 +80,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"cancel": cancelURL,
}).
Post(t.Dtm + "/registerTccBranch")
err = CheckDtmResponse(resp, err)
if err != nil {
if IsFailure(resp, err) {
return resp, err
}
resp, err = common.RestyClient.R().
@ -99,8 +93,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"branch_type": "try",
}).
Post(tryURL)
if err == nil && strings.Contains(resp.String(), "FAILURE") {
err = fmt.Errorf("branch return failure: %s", resp.String())
}
return resp, err
}

View File

@ -18,6 +18,21 @@ func MustGenGid(server string) string {
return res["gid"]
}
// IsFailure 如果err非空或者ret是http的响应且包含FAILURE那么返回true。此时认为业务调用失败
func IsFailure(res interface{}, err error) bool {
resp, ok := res.(*resty.Response)
return err != nil || ok && strings.Contains(resp.String(), "FAILURE")
}
// PanicIfFailure 如果err非空或者ret是http的响应且包含FAILURE那么Panic。此时认为业务调用失败
func PanicIfFailure(res interface{}, err error) {
resp, ok := res.(*resty.Response)
failure := err != nil || ok && strings.Contains(resp.String(), "FAILURE")
if failure {
panic(fmt.Errorf("dtm failure ret: %v err %v", res, err))
}
}
// CheckDtmResponse check the response of dtm, if not ok ,generate error
func CheckDtmResponse(resp *resty.Response, err error) error {
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
// CronTransOnce cron expired trans. use expireIn as expire time
@ -53,7 +54,7 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal {
func handlePanic() {
if err := recover(); err != nil {
logrus.Errorf("\x1b[31m\n----panic %s handlered\x1b[0m\n%s", err.(error).Error(), string(debug.Stack()))
common.RedLogf("----panic %s handlered\n%s", err.(error).Error(), string(debug.Stack()))
}
}

View File

@ -23,35 +23,24 @@ func TestTccBarrier(t *testing.T) {
func tccBarrierRollback(t *testing.T) {
gid := "tccBarrierRollback"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.Contains(t, res1.String(), "SUCCESS")
_, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
assert.Error(t, rerr)
return
resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.True(t, !dtmcli.IsFailure(resp, err))
return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
assert.Error(t, err)
assert.True(t, dtmcli.IsFailure(resp, err))
WaitTransProcessed(gid)
assert.Equal(t, "failed", getTransStatus(gid))
}
func tccBarrierNormal(t *testing.T) {
gid := "tccBarrierNormal"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
e2p(rerr)
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
resp, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.True(t, !dtmcli.IsFailure(resp, err))
return tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
e2p(err)
assert.True(t, !dtmcli.IsFailure(resp, err))
WaitTransProcessed(gid)
assert.Equal(t, "succeed", getTransStatus(gid))
}
@ -60,7 +49,7 @@ func tccBarrierDisorder(t *testing.T) {
timeoutChan := make(chan string, 2)
finishedChan := make(chan string, 2)
gid := "tccBarrierDisorder"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
_, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
body := &examples.TransReq{Amount: 30}
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"
@ -91,8 +80,7 @@ func tccBarrierDisorder(t *testing.T) {
"cancel": cancelURL,
}).
Post(tcc.Dtm + "/registerTccBranch")
e2p(err)
assert.True(t, strings.Contains(r.String(), "SUCCESS"))
assert.True(t, !dtmcli.IsFailure(r, err))
go func() {
logrus.Printf("sleeping to wait for tcc try timeout")
<-timeoutChan
@ -119,7 +107,7 @@ func tccBarrierDisorder(t *testing.T) {
<-finishedChan
<-finishedChan
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("a cancelled tcc")
return nil, fmt.Errorf("a cancelled tcc")
})
assert.Error(t, err, fmt.Errorf("a cancelled tcc"))
assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid))

View File

@ -18,28 +18,26 @@ func TestTcc(t *testing.T) {
func tccNormal(t *testing.T) {
data := &examples.TransReq{Amount: 30}
gid := "tccNormal"
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
ret, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if dtmcli.IsFailure(resp, err) {
return resp, err
}
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
e2p(err)
dtmcli.PanicIfFailure(ret, err)
}
func tccRollback(t *testing.T) {
gid := "tccRollback"
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
resp, err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Contains(t, resp.String(), "SUCCESS")
assert.True(t, !dtmcli.IsFailure(resp, rerr))
examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING")
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
assert.Error(t, rerr)
return
return tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
assert.Error(t, err)
assert.True(t, dtmcli.IsFailure(resp, err))
WaitTransProcessed(gid)
assert.Equal(t, "aborting", getTransStatus(gid))
CronTransOnce(60 * time.Second)

View File

@ -12,9 +12,8 @@ func TccSetup(app *gin.Engine) {
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromReq(c)
e2p(err)
req := reqFrom(c)
logrus.Printf("TransInTccParent ")
_, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
_, rerr := tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return M{"dtm_result": "SUCCESS"}, nil
}))
@ -22,17 +21,15 @@ func TccSetup(app *gin.Engine) {
// TccFireRequestNested 1
func TccFireRequestNested() string {
logrus.Printf("tcc transaction begin")
gid := dtmcli.MustGenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if dtmcli.IsFailure(resp, err) {
return resp, err
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
e2p(err)
dtmcli.PanicIfFailure(ret, err)
return gid
}
@ -40,13 +37,13 @@ func TccFireRequestNested() string {
func TccFireRequest() string {
logrus.Printf("tcc simple transaction begin")
gid := dtmcli.MustGenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if dtmcli.IsFailure(resp, err) {
return resp, err
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})
e2p(err)
dtmcli.PanicIfFailure(ret, err)
return gid
}

View File

@ -14,15 +14,14 @@ import (
func TccBarrierFireRequest() string {
logrus.Printf("tcc transaction begin")
gid := dtmcli.MustGenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
common.CheckRestySuccess(res1, rerr)
res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
common.CheckRestySuccess(res1, rerr)
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
ret, err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (interface{}, error) {
resp, err := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
if dtmcli.IsFailure(resp, err) {
return resp, err
}
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
e2p(err)
dtmcli.PanicIfFailure(ret, err)
return gid
}

View File

@ -40,11 +40,10 @@ func XaFireRequest() string {
func xaTransIn(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (rerr error) {
req := reqFrom(c)
if req.TransInResult == "FAILURE" {
if reqFrom(c).TransInResult == "FAILURE" {
return fmt.Errorf("tranIn FAILURE")
}
_, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", req.Amount, 2)
_, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2)
return
})
if err != nil && strings.Contains(err.Error(), "FAILURE") {
@ -56,11 +55,10 @@ func xaTransIn(c *gin.Context) (interface{}, error) {
func xaTransOut(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c, func(db *sql.DB, xa *dtmcli.Xa) (rerr error) {
req := reqFrom(c)
if req.TransOutResult == "FAILURE" {
if reqFrom(c).TransOutResult == "FAILURE" {
return fmt.Errorf("tranOut failed")
}
_, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", req.Amount, 1)
_, rerr = common.SdbExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
return
})
e2p(err)

View File

@ -38,10 +38,15 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
}
func reqFrom(c *gin.Context) *TransReq {
v, ok := c.Get("trans_req")
if !ok {
req := TransReq{}
err := c.BindJSON(&req)
e2p(err)
return &req
c.Set("trans_req", &req)
v = &req
}
return v.(*TransReq)
}
func infoFromContext(c *gin.Context) *dtmcli.TransInfo {