gid generation decoupled

This commit is contained in:
yedf2 2021-07-23 10:42:15 +08:00
parent 796a136c35
commit 1961ad806d
16 changed files with 74 additions and 78 deletions

View File

@ -30,12 +30,7 @@ type MsgStep struct {
}
// NewMsg create new msg
func NewMsg(server string) *Msg {
return NewMsg2(server, GenGid(server))
}
// NewMsg2 create new msg with specified gid
func NewMsg2(server string, gid string) *Msg {
func NewMsg(server string, gid string) *Msg {
return &Msg{
MsgData: MsgData{
Gid: gid,

View File

@ -30,12 +30,7 @@ type SagaStep struct {
}
// NewSaga create a saga
func NewSaga(server string) *Saga {
return NewSaga2(server, GenGid(server))
}
// NewSaga2 create a saga
func NewSaga2(server string, gid string) *Saga {
func NewSaga(server string, gid string) *Saga {
return &Saga{
SagaData: SagaData{
Gid: gid,

View File

@ -21,20 +21,15 @@ type Tcc struct {
type TccGlobalFunc func(tcc *Tcc) error
// TccGlobalTransaction begin a tcc global transaction
func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) {
return TccGlobalTransaction2(dtm, GenGid(dtm), tccFunc)
}
// TccGlobalTransaction2 begin a tcc global transaction
func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid string, rerr error) {
gid = gidIn
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
data := &M{
"gid": gid,
"trans_type": "tcc",
}
defer func() {
var err error
if x := recover(); x != nil || rerr != nil {
var x interface{}
if x = recover(); x != nil || rerr != nil {
_, err = common.RestyClient.R().SetBody(data).Post(dtm + "/abort")
} else {
_, err = common.RestyClient.R().SetBody(data).Post(dtm + "/submit")
@ -42,6 +37,9 @@ func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid
if err != nil {
logrus.Errorf("submitting or abort global transaction error: %v", err)
}
if x != nil {
panic(x)
}
}()
tcc := &Tcc{Dtm: dtm, Gid: gid}
resp, rerr := common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
@ -50,6 +48,7 @@ func TccGlobalTransaction2(dtm string, gidIn string, tccFunc TccGlobalFunc) (gid
}
if !strings.Contains(resp.String(), "SUCCESS") {
rerr = fmt.Errorf("bad response: %s", resp.String())
return
}
rerr = tccFunc(tcc)
return
@ -89,7 +88,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
if !strings.Contains(resp.String(), "SUCCESS") {
return nil, fmt.Errorf("registerTccBranch failed: %s", resp.String())
}
return common.RestyClient.R().
r, err := common.RestyClient.R().
SetBody(body).
SetQueryParams(common.MS{
"dtm": t.Dtm,
@ -99,4 +98,8 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"branch_type": "try",
}).
Post(tryURL)
if err == nil && strings.Contains(r.String(), "FAILURE") {
return r, fmt.Errorf("branch return failure: %s", r.String())
}
return r, err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
@ -115,14 +116,8 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r
}
// XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
return xc.XaGlobalTransaction2(GenGid(xc.Server), transFunc)
}
// XaGlobalTransaction2 start a xa global transaction with gid=ginIn
func (xc *XaClient) XaGlobalTransaction2(gidIn string, transFunc XaGlobalFunc) (gid string, rerr error) {
xa := Xa{IDGenerator: IDGenerator{}, Gid: gidIn}
gid = xa.Gid
func (xc *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) error {
xa := Xa{IDGenerator: IDGenerator{}, Gid: gid}
data := &M{
"gid": gid,
"trans_type": "xa",
@ -130,29 +125,31 @@ func (xc *XaClient) XaGlobalTransaction2(gidIn string, transFunc XaGlobalFunc) (
defer func() {
x := recover()
if x != nil {
_, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort")
rerr = x.(error)
r, err := common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort")
if !strings.Contains(r.String(), "SUCCESS") {
logrus.Errorf("abort xa error: resp: %s err: %v", r.String(), err)
}
}
}()
resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare")
e2p(rerr)
if !strings.Contains(resp.String(), "SUCCESS") {
panic(fmt.Errorf("unexpected result: %s", resp.String()))
return fmt.Errorf("unexpected result: %s", resp.String())
}
rerr = transFunc(&xa)
e2p(rerr)
resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit")
e2p(rerr)
if !strings.Contains(resp.String(), "SUCCESS") {
panic(fmt.Errorf("unexpected result: %s", resp.String()))
if rerr != nil {
return rerr
}
return
resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit")
if !strings.Contains(resp.String(), "SUCCESS") {
return fmt.Errorf("unexpected result: %s err: %v", resp.String(), rerr)
}
return nil
}
// CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewBranchID()
return common.RestyClient.R().
resp, err := common.RestyClient.R().
SetBody(body).
SetQueryParams(common.MS{
"gid": x.Gid,
@ -161,4 +158,8 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
"branch_type": "action",
}).
Post(url)
if strings.Contains(resp.String(), "FAILURE") {
return resp, fmt.Errorf("unexpected result: %s err: %v", resp.String(), err)
}
return resp, err
}

View File

@ -76,22 +76,20 @@ func assertSucceed(t *testing.T, gid string) {
func genMsg(gid string) *dtmcli.Msg {
logrus.Printf("beginning a msg test ---------------- %s", gid)
msg := dtmcli.NewMsg(examples.DtmServer)
msg := dtmcli.NewMsg(examples.DtmServer, gid)
msg.QueryPrepared = examples.Busi + "/CanSubmit"
req := examples.GenTransReq(30, false, false)
msg.Add(examples.Busi+"/TransOut", &req)
msg.Add(examples.Busi+"/TransIn", &req)
msg.Gid = gid
return msg
}
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtmcli.NewSaga(examples.DtmServer)
saga := dtmcli.NewSaga(examples.DtmServer, gid)
req := examples.GenTransReq(30, outFailed, inFailed)
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
saga.Gid = gid
return saga
}

View File

@ -17,7 +17,7 @@ func TestSagaBarrier(t *testing.T) {
func sagaBarrierNormal(t *testing.T) {
req := &examples.TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer).
saga := dtmcli.NewSaga(DtmServer, "sagaBarrierNormal").
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
logrus.Printf("busi trans submit")
@ -28,7 +28,7 @@ func sagaBarrierNormal(t *testing.T) {
}
func sagaBarrierRollback(t *testing.T) {
saga := dtmcli.NewSaga(DtmServer).
saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback").
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
logrus.Printf("busi trans submit")

View File

@ -22,30 +22,22 @@ func TestTccBarrier(t *testing.T) {
}
func tccBarrierRollback(t *testing.T) {
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := "tccBarrierRollback"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res1.StatusCode())
}
res2, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
e2p(rerr)
if res2.StatusCode() != 200 {
return fmt.Errorf("bad status code: %d", res2.StatusCode())
}
if strings.Contains(res2.String(), "FAILURE") {
return fmt.Errorf("branch trans in fail")
}
logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
assert.Contains(t, res1.String(), "SUCCESS")
_, rerr = tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
assert.Error(t, rerr)
return
})
assert.Equal(t, err, fmt.Errorf("branch trans in fail"))
assert.Error(t, err)
WaitTransProcessed(gid)
assert.Equal(t, "failed", getTransStatus(gid))
}
func tccBarrierNormal(t *testing.T) {
_, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := "tccBarrierNormal"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
e2p(rerr)
if res1.StatusCode() != 200 {
@ -60,12 +52,15 @@ func tccBarrierNormal(t *testing.T) {
return
})
e2p(err)
WaitTransProcessed(gid)
assert.Equal(t, "succeed", getTransStatus(gid))
}
func tccBarrierDisorder(t *testing.T) {
timeoutChan := make(chan string, 2)
finishedChan := make(chan string, 2)
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := "tccBarrierDisorder"
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
body := &examples.TransReq{Amount: 30}
tryURL := Busi + "/TccBTransOutTry"
confirmURL := Busi + "/TccBTransOutConfirm"

View File

@ -3,6 +3,7 @@ package dtmsvr
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
@ -15,7 +16,8 @@ func TestTcc(t *testing.T) {
func tccNormal(t *testing.T) {
data := &examples.TransReq{Amount: 30}
_, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := "tccNormal"
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
@ -26,13 +28,14 @@ func tccNormal(t *testing.T) {
}
func tccRollback(t *testing.T) {
gid := "tccRollback"
data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
_, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
_, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
e2p(rerr)
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
resp, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Contains(t, resp.String(), "SUCCESS")
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
assert.Error(t, rerr)
return
})
e2p(err)
assert.Error(t, err)
}

View File

@ -18,7 +18,8 @@ func TestXa(t *testing.T) {
func xaNormal(t *testing.T) {
xc := examples.XaClient
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
gid := "xaNormal"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)
@ -33,7 +34,8 @@ func xaNormal(t *testing.T) {
func xaRollback(t *testing.T) {
xc := examples.XaClient
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
gid := "xaRollback"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) error {
req := examples.GenTransReq(30, false, true)
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
common.CheckRestySuccess(resp, err)

View File

@ -18,7 +18,7 @@ func MsgFireRequest() string {
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
msg := dtmcli.NewMsg(DtmServer).
msg := dtmcli.NewMsg(DtmServer, dtmcli.GenGid(DtmServer)).
Add(Busi+"/TransOut", req).
Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/TransQuery")

View File

@ -18,7 +18,7 @@ func SagaFireRequest() string {
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtmcli.NewSaga(DtmServer).
saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
logrus.Printf("saga busi trans submit")

View File

@ -14,7 +14,7 @@ import (
func SagaBarrierFireRequest() string {
logrus.Printf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer).
saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)).
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req).
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
logrus.Printf("busi trans submit")

View File

@ -29,7 +29,8 @@ func TccSetup(app *gin.Engine) {
// TccFireRequest 1
func TccFireRequest() string {
logrus.Printf("tcc transaction begin")
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := dtmcli.GenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
if rerr != nil {
return

View File

@ -13,7 +13,8 @@ import (
// TccBarrierFireRequest 1
func TccBarrierFireRequest() string {
logrus.Printf("tcc transaction begin")
gid, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
gid := dtmcli.GenGid(DtmServer)
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) {
res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
if rerr != nil {
return
@ -112,6 +113,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
})
}
// TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
return dtmcli.ThroughBarrierCall(dbGet().ToSQLDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
return adjustTrading(sdb, transOutUID, reqFrom(c).Amount)

View File

@ -38,7 +38,8 @@ func dbGet() *common.DB {
// XaFireRequest 1
func XaFireRequest() string {
gid, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) {
gid := dtmcli.GenGid(DtmServer)
err := XaClient.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (rerr error) {
defer common.P2E(&rerr)
req := GenTransReq(30, false, false)
resp, err := xa.CallBranch(req, Busi+"/TransOutXa")

View File

@ -31,7 +31,7 @@ func QsStartSvr() {
func QsFireRequest() string {
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址
saga := dtmcli.NewSaga(DtmServer).
saga := dtmcli.NewSaga(DtmServer, dtmcli.GenGid(DtmServer)).
// 添加一个TransOut的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransOutCompensate"
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
// 添加一个TransIn的子事务正向操作为url: qsBusi+"/TransOut" 逆向操作为url: qsBusi+"/TransInCompensate"