Merge branch 'alpha' into main

This commit is contained in:
yedf2 2021-07-23 15:23:33 +08:00
commit 677fcc27c4
26 changed files with 571 additions and 410 deletions

8
.simplecov Normal file
View File

@ -0,0 +1,8 @@
require 'simplecov'
require 'coveralls'
SimpleCov.formatter = Coveralls::SimpleCov::Formatter
SimpleCov.start do
add_filter 'app'
add_filter 'examples'
end

View File

@ -7,6 +7,7 @@ branches:
only: only:
- master - master
- main - main
- alpha
services: services:
- mysql - mysql
before_install: before_install:

View File

@ -9,7 +9,7 @@ import (
"testing" "testing"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/go-playground/assert/v2" "github.com/stretchr/testify/assert"
) )
func TestEP(t *testing.T) { func TestEP(t *testing.T) {
@ -83,6 +83,10 @@ func TestSome(t *testing.T) {
n := MustAtoi("123") n := MustAtoi("123")
assert.Equal(t, 123, n) assert.Equal(t, 123, n)
err := CatchP(func() {
MustAtoi("abc")
})
assert.Error(t, err)
wd := MustGetwd() wd := MustGetwd()
assert.NotEqual(t, "", wd) assert.NotEqual(t, "", wd)

View File

@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/url"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -26,18 +27,25 @@ func (t *TransInfo) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType)
} }
// TransInfoFromReq construct transaction info from request // MustGetTrans construct transaction info from request
func TransInfoFromReq(c *gin.Context) *TransInfo { func MustGetTrans(c *gin.Context) *TransInfo {
ti, err := TransInfoFromQuery(c.Request.URL.Query())
e2p(err)
return ti
}
// TransInfoFromQuery construct transaction info from request
func TransInfoFromQuery(qs url.Values) (*TransInfo, error) {
ti := &TransInfo{ ti := &TransInfo{
TransType: c.Query("trans_type"), TransType: qs.Get("trans_type"),
Gid: c.Query("gid"), Gid: qs.Get("gid"),
BranchID: c.Query("branch_id"), BranchID: qs.Get("branch_id"),
BranchType: c.Query("branch_type"), BranchType: qs.Get("branch_type"),
} }
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" { if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" {
panic(fmt.Errorf("invlid trans info: %v", ti)) return nil, fmt.Errorf("invlid trans info: %v", ti)
} }
return ti return ti, nil
} }
// BarrierModel barrier model for gorm // BarrierModel barrier model for gorm
@ -63,7 +71,7 @@ func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, br
if branchType == "" { if branchType == "" {
return 0, nil return 0, nil
} }
res, err := logExec(tx, "insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason) res, err := logExec(tx, "insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values(?,?,?,?,?)", transType, gid, branchID, branchType, reason)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -119,12 +127,12 @@ func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (re
return return
} }
if result.Valid { // 数据库里有上一次结果,返回上一次的结果 if result.Valid { // 数据库里有上一次结果,返回上一次的结果
res = json.Unmarshal([]byte(result.String), &res) rerr = json.Unmarshal([]byte(result.String), &res)
return
} else { // 数据库里没有上次的结果,属于重复空补偿,直接返回成功
res = common.MS{"dtm_result": "SUCCESS"}
return return
} }
// 数据库里没有上次的结果,属于重复空补偿,直接返回成功
res = common.MS{"dtm_result": "SUCCESS"}
return
} }
res, rerr = busiCall(db) res, rerr = busiCall(db)
if rerr == nil { // 正确返回了,需要将结果保存到数据库 if rerr == nil { // 正确返回了,需要将结果保存到数据库

View File

@ -3,7 +3,6 @@ package dtmcli
import ( import (
"fmt" "fmt"
jsonitor "github.com/json-iterator/go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -29,10 +28,10 @@ type MsgStep struct {
} }
// NewMsg create new msg // NewMsg create new msg
func NewMsg(server string) *Msg { func NewMsg(server string, gid string) *Msg {
return &Msg{ return &Msg{
MsgData: MsgData{ MsgData: MsgData{
Gid: GenGid(server), Gid: gid,
TransType: "msg", TransType: "msg",
}, },
Server: server, Server: server,
@ -54,14 +53,7 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
func (s *Msg) Submit() error { func (s *Msg) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
if err != nil { return CheckDtmResponse(resp, err)
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("submit failed: %v", resp.Body())
}
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil
} }
// Prepare prepare the msg // Prepare prepare the msg
@ -69,11 +61,9 @@ func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { rerr := CheckDtmResponse(resp, err)
return err if rerr != nil {
} return rerr
if resp.StatusCode() != 200 {
return fmt.Errorf("prepare failed: %v", resp.Body())
} }
return nil return nil
} }

View File

@ -3,7 +3,6 @@ package dtmcli
import ( import (
"fmt" "fmt"
jsonitor "github.com/json-iterator/go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -29,10 +28,10 @@ type SagaStep struct {
} }
// NewSaga create a saga // NewSaga create a saga
func NewSaga(server string) *Saga { func NewSaga(server string, gid string) *Saga {
return &Saga{ return &Saga{
SagaData: SagaData{ SagaData: SagaData{
Gid: GenGid(server), Gid: gid,
TransType: "saga", TransType: "saga",
}, },
Server: server, Server: server,
@ -55,12 +54,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
func (s *Saga) Submit() error { func (s *Saga) Submit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server))
if err != nil { return CheckDtmResponse(resp, err)
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("submit failed: %v", resp.Body())
}
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil
} }

View File

@ -2,6 +2,7 @@ package dtmcli
import ( import (
"fmt" "fmt"
"strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
@ -20,25 +21,31 @@ type Tcc struct {
type TccGlobalFunc func(tcc *Tcc) error type TccGlobalFunc func(tcc *Tcc) error
// TccGlobalTransaction begin a tcc global transaction // TccGlobalTransaction begin a tcc global transaction
func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) { func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
gid = GenGid(dtm)
data := &M{ data := &M{
"gid": gid, "gid": gid,
"trans_type": "tcc", "trans_type": "tcc",
} }
defer func() { defer func() {
var resp *resty.Response
var err error var err error
if x := recover(); x != nil || rerr != nil { var x interface{}
_, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort") if x = recover(); x != nil || rerr != nil {
resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort")
} else { } else {
_, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit") resp, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit")
} }
if err != nil { err2 := CheckDtmResponse(resp, err)
logrus.Errorf("submitting or abort global transaction error: %v", err) if err2 != nil {
logrus.Errorf("submitting or abort global transaction error: %v", err2)
}
if x != nil {
panic(x)
} }
}() }()
tcc := &Tcc{Dtm: dtm, Gid: gid} tcc := &Tcc{Dtm: dtm, Gid: gid}
_, rerr = common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare") resp, err := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
rerr = CheckDtmResponse(resp, err)
if rerr != nil { if rerr != nil {
return return
} }
@ -74,10 +81,11 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"cancel": cancelURL, "cancel": cancelURL,
}). }).
Post(t.Dtm + "/registerTccBranch") Post(t.Dtm + "/registerTccBranch")
err = CheckDtmResponse(resp, err)
if err != nil { if err != nil {
return resp, err return resp, err
} }
return common.RestyClient.R(). resp, err = common.RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(common.MS{ SetQueryParams(common.MS{
"dtm": t.Dtm, "dtm": t.Dtm,
@ -87,4 +95,8 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"branch_type": "try", "branch_type": "try",
}). }).
Post(tryURL) Post(tryURL)
if err == nil && strings.Contains(resp.String(), "FAILURE") {
err = fmt.Errorf("branch return failure: %s", resp.String())
}
return resp, err
} }

View File

@ -2,18 +2,33 @@ package dtmcli
import ( import (
"fmt" "fmt"
"strings"
"github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
// GenGid generate a new gid // MustGenGid generate a new gid
func GenGid(server string) string { func MustGenGid(server string) string {
res := common.MS{} res := common.MS{}
_, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid") resp, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid")
e2p(err) if err != nil || res["gid"] == "" {
panic(fmt.Errorf("newGid error: %v, resp: %s", err, resp))
}
return res["gid"] return res["gid"]
} }
// CheckDtmResponse check the response of dtm, if not ok ,generate error
func CheckDtmResponse(resp *resty.Response, err error) error {
if err != nil {
return err
}
if !strings.Contains(resp.String(), "SUCCESS") {
return fmt.Errorf("dtm response failed: %s", resp.String())
}
return nil
}
// IDGenerator used to generate a branch id // IDGenerator used to generate a branch id
type IDGenerator struct { type IDGenerator struct {
parentID string parentID string

24
dtmcli/types_test.go Normal file
View File

@ -0,0 +1,24 @@
package dtmcli
import (
"net/url"
"testing"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/common"
)
func TestTypes(t *testing.T) {
err := common.CatchP(func() {
idGen := IDGenerator{parentID: "12345678901234567890123"}
idGen.NewBranchID()
})
assert.Error(t, err)
err = common.CatchP(func() {
idGen := IDGenerator{branchID: 99}
idGen.NewBranchID()
})
assert.Error(t, err)
_, err = TransInfoFromQuery(url.Values{})
assert.Error(t, err)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
@ -34,16 +35,6 @@ type Xa struct {
Gid string Gid string
} }
// GetParams get xa params map
func (x *Xa) GetParams(branchID string) common.MS {
return common.MS{
"gid": x.Gid,
"trans_type": "xa",
"branch_id": branchID,
"branch_type": "action",
}
}
// XaFromReq construct xa info from request // XaFromReq construct xa info from request
func XaFromReq(c *gin.Context) *Xa { func XaFromReq(c *gin.Context) *Xa {
return &Xa{ return &Xa{
@ -52,20 +43,17 @@ func XaFromReq(c *gin.Context) *Xa {
} }
} }
// NewXaBranchID generate a xa branch id
func (x *Xa) NewXaBranchID() string {
return x.Gid + "-" + x.NewBranchID()
}
// NewXaClient construct a xa client // NewXaClient construct a xa client
func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) *XaClient { func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) (*XaClient, error) {
xa := &XaClient{ xa := &XaClient{
Server: server, Server: server,
Conf: mysqlConf, Conf: mysqlConf,
CallbackURL: callbackURL, CallbackURL: callbackURL,
} }
u, err := url.Parse(callbackURL) u, err := url.Parse(callbackURL)
e2p(err) if err != nil {
return nil, err
}
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
type CallbackReq struct { type CallbackReq struct {
Gid string `json:"gid"` Gid string `json:"gid"`
@ -74,7 +62,9 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca
} }
req := CallbackReq{} req := CallbackReq{}
b, err := c.GetRawData() b, err := c.GetRawData()
e2p(err) if err != nil {
return nil, err
}
common.MustUnmarshal(b, &req) common.MustUnmarshal(b, &req)
tx, my := common.DbAlone(xa.Conf) tx, my := common.DbAlone(xa.Conf)
defer my.Close() defer my.Close()
@ -88,7 +78,7 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca
} }
return M{"dtm_result": "SUCCESS"}, nil return M{"dtm_result": "SUCCESS"}, nil
})) }))
return xa return xa, nil
} }
// XaLocalTransaction start a xa local transaction // XaLocalTransaction start a xa local transaction
@ -115,9 +105,8 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r
} }
// XaGlobalTransaction start a xa global transaction // XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { func (xc *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) error {
xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)} xa := Xa{IDGenerator: IDGenerator{}, Gid: gid}
gid = xa.Gid
data := &M{ data := &M{
"gid": gid, "gid": gid,
"trans_type": "xa", "trans_type": "xa",
@ -125,29 +114,33 @@ func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rer
defer func() { defer func() {
x := recover() x := recover()
if x != nil { if x != nil {
_, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") r, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort")
rerr = x.(error) if !strings.Contains(r.String(), "SUCCESS") {
logrus.Errorf("abort xa error: resp: %s err: %v", r.String(), err)
}
} }
}() }()
resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") resp, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare")
e2p(rerr) rerr := CheckDtmResponse(resp, err)
if !strings.Contains(resp.String(), "SUCCESS") { if rerr != nil {
panic(fmt.Errorf("unexpected result: %s", resp.String())) return rerr
} }
rerr = transFunc(&xa) rerr = transFunc(&xa)
e2p(rerr) if rerr != nil {
resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") return rerr
e2p(rerr)
if !strings.Contains(resp.String(), "SUCCESS") {
panic(fmt.Errorf("unexpected result: %s", resp.String()))
} }
return resp, err = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit")
rerr = CheckDtmResponse(resp, err)
if rerr != nil {
return rerr
}
return nil
} }
// CallBranch call a xa branch // CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewBranchID() branchID := x.NewBranchID()
return common.RestyClient.R(). resp, err := common.RestyClient.R().
SetBody(body). SetBody(body).
SetQueryParams(common.MS{ SetQueryParams(common.MS{
"gid": x.Gid, "gid": x.Gid,
@ -156,4 +149,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
"branch_type": "action", "branch_type": "action",
}). }).
Post(url) Post(url)
if strings.Contains(resp.String(), "FAILURE") {
return resp, fmt.Errorf("FAILURE result: %s err: %v", resp.String(), err)
}
return resp, err
} }

View File

@ -21,14 +21,14 @@ func addRoute(engine *gin.Engine) {
} }
func newGid(c *gin.Context) (interface{}, error) { func newGid(c *gin.Context) (interface{}, error) {
return M{"gid": GenGid()}, nil return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil
} }
func prepare(c *gin.Context) (interface{}, error) { func prepare(c *gin.Context) (interface{}, error) {
t := TransFromContext(c) t := TransFromContext(c)
t.Status = "prepared" t.Status = "prepared"
t.saveNew(dbGet()) t.saveNew(dbGet())
return M{"dtm_result": "SUCCESS", "gid": t.Gid}, nil return M{"dtm_result": "SUCCESS"}, nil
} }
func submit(c *gin.Context) (interface{}, error) { func submit(c *gin.Context) (interface{}, error) {
@ -41,7 +41,7 @@ func submit(c *gin.Context) (interface{}, error) {
t.Status = "submitted" t.Status = "submitted"
t.saveNew(db) t.saveNew(db)
go t.Process(db) go t.Process(db)
return M{"dtm_result": "SUCCESS", "gid": t.Gid}, nil return M{"dtm_result": "SUCCESS"}, nil
} }
func abort(c *gin.Context) (interface{}, error) { func abort(c *gin.Context) (interface{}, error) {

View File

@ -3,9 +3,7 @@ package dtmsvr
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"strings"
"testing" "testing"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -19,7 +17,7 @@ var DtmServer = examples.DtmServer
var Busi = examples.Busi var Busi = examples.Busi
var app *gin.Engine var app *gin.Engine
func init() { func TestMain(m *testing.M) {
TransProcessedTestChan = make(chan string, 1) TransProcessedTestChan = make(chan string, 1)
common.InitApp(common.GetProjectDir(), &config) common.InitApp(common.GetProjectDir(), &config)
config.Mysql["database"] = dbName config.Mysql["database"] = dbName
@ -40,34 +38,7 @@ func init() {
e2p(dbGet().Exec("truncate trans_branch").Error) e2p(dbGet().Exec("truncate trans_branch").Error)
e2p(dbGet().Exec("truncate trans_log").Error) e2p(dbGet().Exec("truncate trans_log").Error)
examples.ResetXaData() examples.ResetXaData()
} m.Run()
func TestDtmSvr(t *testing.T) {
tccBarrierDisorder(t)
tccBarrierNormal(t)
tccBarrierRollback(t)
sagaBarrierNormal(t)
sagaBarrierRollback(t)
msgNormal(t)
msgPending(t)
tccNormal(t)
tccRollback(t)
sagaNormal(t)
xaNormal(t)
xaRollback(t)
sagaCommittedPending(t)
sagaRollback(t)
// for coverage
examples.QsStartSvr()
assertSucceed(t, examples.QsFireRequest())
assertSucceed(t, examples.MsgFireRequest())
assertSucceed(t, examples.SagaBarrierFireRequest())
assertSucceed(t, examples.SagaFireRequest())
assertSucceed(t, examples.TccBarrierFireRequest())
assertSucceed(t, examples.TccFireRequest())
assertSucceed(t, examples.XaFireRequest())
} }
func TestCover(t *testing.T) { func TestCover(t *testing.T) {
@ -80,6 +51,21 @@ func TestCover(t *testing.T) {
go CronExpiredTrans(1) go CronExpiredTrans(1)
} }
func TestType(t *testing.T) {
err := common.CatchP(func() {
dtmcli.MustGenGid("http://localhost:8080/api/no")
})
assert.Error(t, err)
err = common.CatchP(func() {
resp, err := common.RestyClient.R().SetBody(common.M{
"gid": "1",
"trans_type": "msg",
}).Get("http://localhost:8080/api/dtmsvr/abort")
common.CheckRestySuccess(resp, err)
})
assert.Error(t, err)
}
func getTransStatus(gid string) string { func getTransStatus(gid string) string {
sm := TransGlobal{} sm := TransGlobal{}
dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm)
@ -98,177 +84,6 @@ func getBranchesStatus(gid string) []string {
return status return status
} }
func xaNormal(t *testing.T) {
xc := examples.XaClient
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
e2p(err)
WaitTransProcessed(gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
}
func xaRollback(t *testing.T) {
xc := examples.XaClient
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, true)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
if err != nil {
logrus.Errorf("global transaction failed, so rollback")
}
WaitTransProcessed(gid)
assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
assert.Equal(t, "failed", getTransStatus(gid))
}
func tccNormal(t *testing.T) {
data := &examples.TransReq{Amount: 30}
_, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
})
e2p(err)
}
func tccBarrierNormal(t *testing.T) {
_, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
e2p(rerr)
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
})
e2p(err)
}
func tccBarrierRollback(t *testing.T) {
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
e2p(rerr)
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
if strings.Contains(res2.String(), "FAILURE") {
return fmt.Errorf("branch trans in fail")
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
})
assert.Equal(t, err, fmt.Errorf("branch trans in fail"))
WaitTransProcessed(gid)
assert.Equal(t, "failed", getTransStatus(gid))
}
func tccRollback(t *testing.T) {
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
_, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
})
e2p(err)
}
func msgNormal(t *testing.T) {
msg := genMsg("gid-msg-normal")
msg.Submit()
assert.Equal(t, "submitted", getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
func msgPending(t *testing.T) {
msg := genMsg("gid-msg-normal-pending")
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
assert.Equal(t, "submitted", getTransStatus(msg.Gid))
CronTransOnce(60 * time.Second)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false)
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
transQuery(t, saga.Gid)
}
func sagaBarrierNormal(t *testing.T) {
req := &examples.TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
logrus.Printf("busi trans submit")
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
}
func sagaRollback(t *testing.T) {
saga := genSaga("gid-rollbackSaga2", false, true)
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid))
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
}
func sagaBarrierRollback(t *testing.T) {
saga := dtmcli.NewSaga(DtmServer).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
logrus.Printf("busi trans submit")
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid))
}
func sagaCommittedPending(t *testing.T) {
saga := genSaga("gid-committedPending", false, false)
examples.MainSwitch.TransInResult.SetOnce("PENDING")
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(60 * time.Second)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
}
func assertSucceed(t *testing.T, gid string) { func assertSucceed(t *testing.T, gid string) {
WaitTransProcessed(gid) WaitTransProcessed(gid)
assert.Equal(t, "succeed", getTransStatus(gid)) assert.Equal(t, "succeed", getTransStatus(gid))
@ -276,22 +91,20 @@ func assertSucceed(t *testing.T, gid string) {
func genMsg(gid string) *dtmcli.Msg { func genMsg(gid string) *dtmcli.Msg {
logrus.Printf("beginning a msg test ---------------- %s", gid) logrus.Printf("beginning a msg test ---------------- %s", gid)
msg := dtmcli.NewMsg(examples.DtmServer) msg := dtmcli.NewMsg(examples.DtmServer, gid)
msg.QueryPrepared = examples.Busi + "/CanSubmit" msg.QueryPrepared = examples.Busi + "/CanSubmit"
req := examples.GenTransReq(30, false, false) req := examples.GenTransReq(30, false, false)
msg.Add(examples.Busi+"/TransOut", &req) msg.Add(examples.Busi+"/TransOut", &req)
msg.Add(examples.Busi+"/TransIn", &req) msg.Add(examples.Busi+"/TransIn", &req)
msg.Gid = gid
return msg return msg
} }
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid) logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtmcli.NewSaga(examples.DtmServer) saga := dtmcli.NewSaga(examples.DtmServer, gid)
req := examples.GenTransReq(30, outFailed, inFailed) req := examples.GenTransReq(30, outFailed, inFailed)
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
saga.Gid = gid
return saga return saga
} }
@ -335,79 +148,17 @@ func TestSqlDB(t *testing.T) {
asserts.Equal(dbr.RowsAffected, int64(1)) asserts.Equal(dbr.RowsAffected, int64(1))
dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(0)) asserts.Equal(dbr.RowsAffected, int64(0))
gid2Res := common.M{"result": "first"}
_, err = dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) { _, err = dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) {
logrus.Printf("submit gid2") logrus.Printf("submit gid2")
return nil, nil return gid2Res, nil
}) })
asserts.Nil(err) asserts.Nil(err)
dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{}) dbr = db.Model(&dtmcli.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtmcli.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(1)) asserts.Equal(dbr.RowsAffected, int64(1))
} newResult, err := dtmcli.ThroughBarrierCall(db.ToSQLDB(), transInfo, func(db *sql.DB) (interface{}, error) {
logrus.Printf("submit gid2")
func tccBarrierDisorder(t *testing.T) { return common.MS{"result": "ignored"}, nil
timeoutChan := make(chan string, 2)
finishedChan := make(chan string, 2)
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
body := &examples.TransReq{Amount: 30}
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"
cancelURL := Busi + "/TccBSleepCancel"
// 请参见子事务屏障里的时序图这里为了模拟该时序图手动拆解了callbranch
branchID := tcc.NewBranchID()
sleeped := false
app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
res, err := examples.TccBarrierTransOutCancel(c)
if !sleeped {
sleeped = true
logrus.Printf("sleep before cancel return")
<-timeoutChan
finishedChan <- "1"
}
return res, err
}))
// 注册子事务
r, err := common.RestyClient.R().
SetBody(&M{
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"status": "prepared",
"data": string(common.MustMarshal(body)),
"try": tryURL,
"confirm": confirmURL,
"cancel": cancelURL,
}).
Post(tcc.Dtm + "/registerTccBranch")
e2p(err)
assert.True(t, strings.Contains(r.String(), "SUCCESS"))
go func() {
logrus.Printf("sleeping to wait for tcc try timeout")
<-timeoutChan
_, _ = common.RestyClient.R().
SetBody(body).
SetQueryParams(common.MS{
"dtm": tcc.Dtm,
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"branch_type": "try",
}).
Post(tryURL)
finishedChan <- "1"
}()
logrus.Printf("cron to timeout and then call cancel")
go CronTransOnce(60 * time.Second)
time.Sleep(100 * time.Millisecond)
logrus.Printf("cron to timeout and then call cancelled twice")
CronTransOnce(60 * time.Second)
timeoutChan <- "wake"
timeoutChan <- "wake"
<-finishedChan
<-finishedChan
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("a cancelled tcc")
}) })
assert.Error(t, err, fmt.Errorf("a cancelled tcc")) asserts.Equal(newResult, gid2Res)
assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid))
assert.Equal(t, "failed", getTransStatus(gid))
} }

19
dtmsvr/examples_test.go Normal file
View File

@ -0,0 +1,19 @@
package dtmsvr
import (
"testing"
"github.com/yedf/dtm/examples"
)
func TestExamples(t *testing.T) {
// for coverage
examples.QsStartSvr()
assertSucceed(t, examples.QsFireRequest())
assertSucceed(t, examples.MsgFireRequest())
assertSucceed(t, examples.SagaBarrierFireRequest())
assertSucceed(t, examples.SagaFireRequest())
assertSucceed(t, examples.TccBarrierFireRequest())
assertSucceed(t, examples.TccFireRequest())
assertSucceed(t, examples.XaFireRequest())
}

39
dtmsvr/trans_msg_test.go Normal file
View File

@ -0,0 +1,39 @@
package dtmsvr
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/examples"
)
func TestMsg(t *testing.T) {
msgNormal(t)
msgPending(t)
}
func msgNormal(t *testing.T) {
msg := genMsg("gid-msg-normal")
msg.Submit()
assert.Equal(t, "submitted", getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
func msgPending(t *testing.T) {
msg := genMsg("gid-msg-normal-pending")
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
assert.Equal(t, "submitted", getTransStatus(msg.Gid))
CronTransOnce(60 * time.Second)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}

View File

@ -0,0 +1,39 @@
package dtmsvr
import (
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
func TestSagaBarrier(t *testing.T) {
sagaBarrierNormal(t)
sagaBarrierRollback(t)
}
func sagaBarrierNormal(t *testing.T) {
req := &examples.TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer, "sagaBarrierNormal").
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
logrus.Printf("busi trans submit")
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
}
func sagaBarrierRollback(t *testing.T) {
saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback").
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
logrus.Printf("busi trans submit")
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid))
}

44
dtmsvr/trans_saga_test.go Normal file
View File

@ -0,0 +1,44 @@
package dtmsvr
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/examples"
)
func TestSaga(t *testing.T) {
sagaNormal(t)
sagaCommittedPending(t)
sagaRollback(t)
}
func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false)
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
transQuery(t, saga.Gid)
}
func sagaCommittedPending(t *testing.T) {
saga := genSaga("gid-committedPending", false, false)
examples.MainSwitch.TransInResult.SetOnce("PENDING")
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(60 * time.Second)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
}
func sagaRollback(t *testing.T) {
saga := genSaga("gid-rollbackSaga2", false, true)
saga.Submit()
WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid))
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
}

View File

@ -0,0 +1,127 @@
package dtmsvr
import (
"fmt"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
func TestTccBarrier(t *testing.T) {
tccBarrierDisorder(t)
tccBarrierNormal(t)
tccBarrierRollback(t)
}
func tccBarrierRollback(t *testing.T) {
gid := "tccBarrierRollback"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.Contains(t, res1.String(), "SUCCESS")
_, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
assert.Error(t, rerr)
return
})
assert.Error(t, err)
WaitTransProcessed(gid)
assert.Equal(t, "failed", getTransStatus(gid))
}
func tccBarrierNormal(t *testing.T) {
gid := "tccBarrierNormal"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
e2p(rerr)
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
})
e2p(err)
WaitTransProcessed(gid)
assert.Equal(t, "succeed", getTransStatus(gid))
}
func tccBarrierDisorder(t *testing.T) {
timeoutChan := make(chan string, 2)
finishedChan := make(chan string, 2)
gid := "tccBarrierDisorder"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
body := &examples.TransReq{Amount: 30}
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"
cancelURL := Busi + "/TccBSleepCancel"
// 请参见子事务屏障里的时序图这里为了模拟该时序图手动拆解了callbranch
branchID := tcc.NewBranchID()
sleeped := false
app.POST(examples.BusiAPI+"/TccBSleepCancel", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
res, err := examples.TccBarrierTransOutCancel(c)
if !sleeped {
sleeped = true
logrus.Printf("sleep before cancel return")
<-timeoutChan
finishedChan <- "1"
}
return res, err
}))
// 注册子事务
r, err := common.RestyClient.R().
SetBody(&M{
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"status": "prepared",
"data": string(common.MustMarshal(body)),
"try": tryURL,
"confirm": confirmURL,
"cancel": cancelURL,
}).
Post(tcc.Dtm + "/registerTccBranch")
e2p(err)
assert.True(t, strings.Contains(r.String(), "SUCCESS"))
go func() {
logrus.Printf("sleeping to wait for tcc try timeout")
<-timeoutChan
r, _ = common.RestyClient.R().
SetBody(body).
SetQueryParams(common.MS{
"dtm": tcc.Dtm,
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
"branch_type": "try",
}).
Post(tryURL)
assert.True(t, strings.Contains(r.String(), "FAILURE"))
finishedChan <- "1"
}()
logrus.Printf("cron to timeout and then call cancel")
go CronTransOnce(60 * time.Second)
time.Sleep(100 * time.Millisecond)
logrus.Printf("cron to timeout and then call cancelled twice")
CronTransOnce(60 * time.Second)
timeoutChan <- "wake"
timeoutChan <- "wake"
<-finishedChan
<-finishedChan
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("a cancelled tcc")
})
assert.Error(t, err, fmt.Errorf("a cancelled tcc"))
assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid))
assert.Equal(t, "failed", getTransStatus(gid))
}

41
dtmsvr/trans_tcc_test.go Normal file
View File

@ -0,0 +1,41 @@
package dtmsvr
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
func TestTcc(t *testing.T) {
tccNormal(t)
tccRollback(t)
}
func tccNormal(t *testing.T) {
data := &examples.TransReq{Amount: 30}
gid := "tccNormal"
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
})
e2p(err)
}
func tccRollback(t *testing.T) {
gid := "tccRollback"
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Contains(t, resp.String(), "SUCCESS")
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
assert.Error(t, rerr)
return
})
assert.Error(t, err)
}

59
dtmsvr/trans_xa_test.go Normal file
View File

@ -0,0 +1,59 @@
package dtmsvr
import (
"fmt"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
func TestXa(t *testing.T) {
xaLocalError(t)
xaNormal(t)
xaRollback(t)
}
func xaLocalError(t *testing.T) {
err := examples.XaClient.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) error {
return fmt.Errorf("an error")
})
assert.Error(t, err, fmt.Errorf("an error"))
}
func xaNormal(t *testing.T) {
xc := examples.XaClient
gid := "xaNormal"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
e2p(err)
WaitTransProcessed(gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
}
func xaRollback(t *testing.T) {
xc := examples.XaClient
gid := "xaRollback"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, true)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
if err != nil {
logrus.Errorf("global transaction failed, so rollback")
}
WaitTransProcessed(gid)
assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
assert.Equal(t, "failed", getTransStatus(gid))
}

View File

@ -18,7 +18,7 @@ func MsgFireRequest() string {
TransInResult: "SUCCESS", TransInResult: "SUCCESS",
TransOutResult: "SUCCESS", TransOutResult: "SUCCESS",
} }
msg := dtmcli.NewMsg(DtmServer). msg := dtmcli.NewMsg(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", req). Add(Busi+"/TransOut", req).
Add(Busi+"/TransIn", req) Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/TransQuery") err := msg.Prepare(Busi + "/TransQuery")

View File

@ -18,7 +18,7 @@ func SagaFireRequest() string {
TransInResult: "SUCCESS", TransInResult: "SUCCESS",
TransOutResult: "SUCCESS", TransOutResult: "SUCCESS",
} }
saga := dtmcli.NewSaga(DtmServer). saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req) Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
logrus.Printf("saga busi trans submit") logrus.Printf("saga busi trans submit")

View File

@ -14,7 +14,7 @@ import (
func SagaBarrierFireRequest() string { func SagaBarrierFireRequest() string {
logrus.Printf("a busi transaction begin") logrus.Printf("a busi transaction begin")
req := &TransReq{Amount: 30} req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer). saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
logrus.Printf("busi trans submit") logrus.Printf("busi trans submit")
@ -44,13 +44,13 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" { if req.TransInResult != "" {
return req.TransInResult, nil return req.TransInResult, nil
} }
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, req.Amount) return sagaBarrierAdjustBalance(sdb, 1, req.Amount)
}) })
} }
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount) return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount)
}) })
} }
@ -60,13 +60,13 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" { if req.TransInResult != "" {
return req.TransInResult, nil return req.TransInResult, nil
} }
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, -req.Amount) return sagaBarrierAdjustBalance(sdb, 2, -req.Amount)
}) })
} }
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount) return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount)
}) })
} }

View File

@ -11,16 +11,11 @@ import (
func TccSetup(app *gin.Engine) { func TccSetup(app *gin.Engine) {
app.POST(BusiAPI+"/TransInTcc", common.WrapHandler(func(c *gin.Context) (interface{}, error) { app.POST(BusiAPI+"/TransInTcc", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromReq(c) tcc, err := dtmcli.TccFromReq(c)
if err != nil { e2p(err)
return nil, err
}
req := reqFrom(c) req := reqFrom(c)
logrus.Printf("Trans in %d here, and Trans in another %d in call2 ", req.Amount/2, req.Amount/2) logrus.Printf("Trans in %d here, and Trans in another %d in call2 ", req.Amount/2, req.Amount/2)
_, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") _, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
if rerr != nil { e2p(rerr)
return nil, rerr
}
return M{"dtm_result": "SUCCESS"}, nil return M{"dtm_result": "SUCCESS"}, nil
})) }))
@ -29,15 +24,12 @@ func TccSetup(app *gin.Engine) {
// TccFireRequest 1 // TccFireRequest 1
func TccFireRequest() string { func TccFireRequest() string {
logrus.Printf("tcc transaction begin") logrus.Printf("tcc transaction begin")
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { gid := dtmcli.MustGenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if rerr != nil { e2p(rerr)
return
}
res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTcc", Busi+"/TransInConfirm", Busi+"/TransInRevert") res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTcc", Busi+"/TransInConfirm", Busi+"/TransInRevert")
if rerr != nil { e2p(rerr)
return
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return return
}) })

View File

@ -13,21 +13,12 @@ import (
// TccBarrierFireRequest 1 // TccBarrierFireRequest 1
func TccBarrierFireRequest() string { func TccBarrierFireRequest() string {
logrus.Printf("tcc transaction begin") logrus.Printf("tcc transaction begin")
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { gid := dtmcli.MustGenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel") res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
if rerr != nil { common.CheckRestySuccess(res1, rerr)
return
}
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel") res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
if rerr != nil { common.CheckRestySuccess(res1, rerr)
return
}
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return return
}) })
@ -79,19 +70,19 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" { if req.TransInResult != "" {
return req.TransInResult, nil return req.TransInResult, nil
} }
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustTrading(sdb, transInUID, req.Amount) return adjustTrading(sdb, transInUID, req.Amount)
}) })
} }
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) { func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustBalance(sdb, transInUID, reqFrom(c).Amount) return adjustBalance(sdb, transInUID, reqFrom(c).Amount)
}) })
} }
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) { func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustTrading(sdb, transInUID, -reqFrom(c).Amount) return adjustTrading(sdb, transInUID, -reqFrom(c).Amount)
}) })
} }
@ -101,19 +92,20 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" { if req.TransInResult != "" {
return req.TransInResult, nil return req.TransInResult, nil
} }
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustTrading(sdb, transOutUID, -req.Amount) return adjustTrading(sdb, transOutUID, -req.Amount)
}) })
} }
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount) return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount)
}) })
} }
// TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.MustGetTrans(c), func(sdb *sql.DB) (interface{}, error) {
return adjustTrading(sdb, transOutUID, reqFrom(c).Amount) return adjustTrading(sdb, transOutUID, reqFrom(c).Amount)
}) })
} }

View File

@ -2,6 +2,7 @@ package examples
import ( import (
"fmt" "fmt"
"strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -38,7 +39,8 @@ func dbGet() *common.DB {
// XaFireRequest 1 // XaFireRequest 1
func XaFireRequest() string { func XaFireRequest() string {
gid, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) { gid := dtmcli.MustGenGid(DtmServer)
err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (rerr error) {
defer common.P2E(&rerr) defer common.P2E(&rerr)
req := GenTransReq(30, false, false) req := GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, Busi+"/TransOutXa") resp, err := xa.CallBranch(req, Busi+"/TransOutXa")
@ -56,18 +58,23 @@ func XaSetup(app *gin.Engine) {
app.POST(BusiAPI+"/TransInXa", common.WrapHandler(xaTransIn)) app.POST(BusiAPI+"/TransInXa", common.WrapHandler(xaTransIn))
app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(xaTransOut)) app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(xaTransOut))
config.Mysql["database"] = "dtm_busi" config.Mysql["database"] = "dtm_busi"
XaClient = dtmcli.NewXaClient(DtmServer, config.Mysql, app, Busi+"/xa") var err error
XaClient, err = dtmcli.NewXaClient(DtmServer, config.Mysql, app, Busi+"/xa")
e2p(err)
} }
func xaTransIn(c *gin.Context) (interface{}, error) { func xaTransIn(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) { err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) {
req := reqFrom(c) req := reqFrom(c)
if req.TransInResult != "SUCCESS" { if req.TransInResult != "SUCCESS" {
return fmt.Errorf("tranIn failed") return fmt.Errorf("tranIn FAILURE")
} }
dbr := db.Exec("update user_account set balance=balance+? where user_id=?", req.Amount, 2) dbr := db.Exec("update user_account set balance=balance+? where user_id=?", req.Amount, 2)
return dbr.Error return dbr.Error
}) })
if err != nil && strings.Contains(err.Error(), "FAILURE") {
return M{"dtm_result": "FAILURE"}, nil
}
e2p(err) e2p(err)
return M{"dtm_result": "SUCCESS"}, nil return M{"dtm_result": "SUCCESS"}, nil
} }

View File

@ -31,7 +31,7 @@ func QsStartSvr() {
func QsFireRequest() string { func QsFireRequest() string {
req := &gin.H{"amount": 30} // 微服务的载荷 req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址 // DtmServer为DTM服务的地址
saga := dtmcli.NewSaga(DtmServer). saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
// 添加一个TransOut的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransOutCompensate" // 添加一个TransOut的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransOutCompensate"
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req). Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
// 添加一个TransIn的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransInCompensate" // 添加一个TransIn的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransInCompensate"