tcc pass compile

This commit is contained in:
yedongfu 2021-07-03 22:07:45 +08:00
parent 2a3297a3c5
commit ed4db3b854
7 changed files with 130 additions and 90 deletions

View File

@ -11,6 +11,9 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
) )
type M = map[string]interface{}
type MS = map[string]string
type ModelBase struct { type ModelBase struct {
ID uint ID uint
CreateTime *time.Time `gorm:"autoCreateTime"` CreateTime *time.Time `gorm:"autoCreateTime"`

View File

@ -23,8 +23,6 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type M = map[string]interface{}
func P2E(perr *error) { func P2E(perr *error) {
if x := recover(); x != nil { if x := recover(); x != nil {
if e, ok := x.(error); ok { if e, ok := x.(error); ok {
@ -86,10 +84,6 @@ func MustAtoi(s string) int {
return r return r
} }
func GenGid() string {
return getOneHexIp() + "-" + gNode.Generate().Base58()
}
var gNode *snowflake.Node = nil var gNode *snowflake.Node = nil
func init() { func init() {
@ -98,6 +92,10 @@ func init() {
gNode = node gNode = node
} }
func GenGid() string {
return getOneHexIp() + "-" + gNode.Generate().Base58()
}
func OrString(ss ...string) string { func OrString(ss ...string) string {
for _, s := range ss { for _, s := range ss {
if s != "" { if s != "" {

View File

@ -3,57 +3,62 @@ package dtmcli
import ( import (
"fmt" "fmt"
jsonitor "github.com/json-iterator/go" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/go-resty/resty/v2"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
type Tcc struct { type Tcc struct {
TccData Dtm string
Server string Gid string
} }
type TccData struct { type TccGlobalFunc func(tcc *Tcc) error
Gid string `json:"gid"`
TransType string `json:"trans_type"` func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) {
Steps []TccStep `json:"steps"` gid = common.GenGid()
data := &M{
"gid": gid,
"trans_type": "tcc",
} }
type TccStep struct { defer func() {
Try string `json:"try"` if x := recover(); x != nil {
Confirm string `json:"confirm"` _, rerr = common.RestyClient.R().SetBody(data).Post(dtm + "/abort")
Cancel string `json:"cancel"` } else {
Data string `json:"data"` _, rerr = common.RestyClient.R().SetBody(data).Post(dtm + "/submit")
}
}()
tcc := &Tcc{Dtm: dtm, Gid: gid}
_, rerr = common.RestyClient.R().SetBody(data).Post(tcc.Dtm + "/prepare")
if rerr != nil {
return
}
rerr = tccFunc(tcc)
return
} }
func NewTcc(server string) *Tcc { func TccFromReq(c *gin.Context) (*Tcc, error) {
return &Tcc{ tcc := &Tcc{
TccData: TccData{ Dtm: c.Query("dtm"),
TransType: "tcc", Gid: c.Query("gid"),
},
Server: server,
} }
if tcc.Dtm == "" || tcc.Gid == "" {
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid)
} }
func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc { return tcc, nil
logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data)
step := TccStep{
Try: try,
Confirm: confirm,
Cancel: cancel,
Data: common.MustMarshalString(data),
}
s.Steps = append(s.Steps, step)
return s
} }
func (s *Tcc) Submit() error { func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) {
logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) return common.RestyClient.R().
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/submit", s.Server)) SetBody(&M{
if err != nil { "gid": t.Gid,
return err "branch": common.GenGid(),
} "trans_type": "tcc",
if resp.StatusCode() != 200 { "status": "prepared",
return fmt.Errorf("submit failed: %v", resp.Body()) "data": string(common.MustMarshal(body)),
} "try": tryUrl,
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() "confirm": confirmUrl,
return nil "cancel": cancelUrl,
}).
Post(t.Dtm + "/registerXaBranch")
} }

View File

@ -65,7 +65,7 @@ func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error)
e2p(err) e2p(err)
resp, err := common.RestyClient.R(). resp, err := common.RestyClient.R().
SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}).
Post(xa.Server + "/branch") Post(xa.Server + "/registerXaBranch")
e2p(err) e2p(err)
if !strings.Contains(resp.String(), "SUCCESS") { if !strings.Contains(resp.String(), "SUCCESS") {
e2p(fmt.Errorf("unknown server response: %s", resp.String())) e2p(fmt.Errorf("unknown server response: %s", resp.String()))

View File

@ -13,7 +13,8 @@ import (
func AddRoute(engine *gin.Engine) { func AddRoute(engine *gin.Engine) {
engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare)) engine.POST("/api/dtmsvr/prepare", common.WrapHandler(Prepare))
engine.POST("/api/dtmsvr/submit", common.WrapHandler(Submit)) engine.POST("/api/dtmsvr/submit", common.WrapHandler(Submit))
engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch)) engine.POST("/api/dtmsvr/registerXaBranch", common.WrapHandler(RegisterXaBranch))
engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(RegisterTccBranch))
engine.POST("/api/dtmsvr/abort", common.WrapHandler(Abort)) engine.POST("/api/dtmsvr/abort", common.WrapHandler(Abort))
engine.GET("/api/dtmsvr/query", common.WrapHandler(Query)) engine.GET("/api/dtmsvr/query", common.WrapHandler(Query))
} }
@ -46,7 +47,7 @@ func Abort(c *gin.Context) (interface{}, error) {
return M{"message": "SUCCESS"}, nil return M{"message": "SUCCESS"}, nil
} }
func Branch(c *gin.Context) (interface{}, error) { func RegisterXaBranch(c *gin.Context) (interface{}, error) {
branch := TransBranch{} branch := TransBranch{}
err := c.BindJSON(&branch) err := c.BindJSON(&branch)
e2p(err) e2p(err)
@ -61,6 +62,30 @@ func Branch(c *gin.Context) (interface{}, error) {
return M{"message": "SUCCESS"}, nil return M{"message": "SUCCESS"}, nil
} }
func RegisterTccBranch(c *gin.Context) (interface{}, error) {
data := common.MS{}
err := c.BindJSON(&data)
e2p(err)
branch := TransBranch{
Gid: data["gid"],
Branch: data["branch_id"],
Status: data["status"],
Data: data["data"],
}
branches := []*TransBranch{&branch, &branch, &branch}
for i, b := range []string{"cancel", "confirm", "try"} {
branches[i].BranchType = b
branches[i].Url = data[b]
}
dbGet().Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
e2p(err)
return M{"message": "SUCCESS"}, nil
}
func Query(c *gin.Context) (interface{}, error) { func Query(c *gin.Context) (interface{}, error) {
gid := c.Query("gid") gid := c.Query("gid")
if gid == "" { if gid == "" {

View File

@ -14,6 +14,9 @@ import (
"github.com/yedf/dtm/examples" "github.com/yedf/dtm/examples"
) )
var DtmServer = examples.DtmServer
var Busi = examples.Busi
var myinit int = func() int { var myinit int = func() int {
common.InitApp(common.GetProjectDir(), &config) common.InitApp(common.GetProjectDir(), &config)
config.Mysql["database"] = dbName config.Mysql["database"] = dbName
@ -50,7 +53,6 @@ func TestDtmSvr(t *testing.T) {
sagaNormal(t) sagaNormal(t)
tccNormal(t) tccNormal(t)
tccRollback(t) tccRollback(t)
tccRollbackPending(t)
xaNormal(t) xaNormal(t)
xaRollback(t) xaRollback(t)
sagaCommittedPending(t) sagaCommittedPending(t)
@ -131,28 +133,27 @@ func xaRollback(t *testing.T) {
} }
func tccNormal(t *testing.T) { func tccNormal(t *testing.T) {
tcc := genTcc("gid-tcc-normal", false, false) data := &examples.TransReq{Amount: 30}
tcc.Submit() _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
assert.Equal(t, "submitted", getTransStatus(tcc.Gid)) _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
WaitTransProcessed(tcc.Gid) e2p(rerr)
assert.Equal(t, []string{"prepared", "succeed", "succeed", "prepared", "succeed", "succeed"}, getBranchesStatus(tcc.Gid)) _, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
})
e2p(err)
} }
func tccRollback(t *testing.T) { func tccRollback(t *testing.T) {
tcc := genTcc("gid-tcc-rollback", false, true) data := &examples.TransReq{Amount: 30, TransInResult: "FAIL"}
tcc.Submit() _, err := dtmcli.TccGlobalTransaction(examples.DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
WaitTransProcessed(tcc.Gid) _, rerr = tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) e2p(rerr)
_, rerr = tcc.CallBranch(data, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
e2p(rerr)
return
})
e2p(err)
} }
func tccRollbackPending(t *testing.T) {
tcc := genTcc("gid-tcc-rollback-pending", false, true)
examples.MainSwitch.TransInRevertResult.SetOnce("PENDING")
tcc.Submit()
WaitTransProcessed(tcc.Gid)
// assert.Equal(t, "submitted", getTransStatus(tcc.Gid))
CronTransOnce(60*time.Second, "submitted")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
}
func msgNormal(t *testing.T) { func msgNormal(t *testing.T) {
msg := genMsg("gid-normal-msg") msg := genMsg("gid-normal-msg")
msg.Submit() msg.Submit()
@ -226,16 +227,6 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
return saga return saga
} }
func genTcc(gid string, outFailed bool, inFailed bool) *dtmcli.Tcc {
logrus.Printf("beginning a tcc test ---------------- %s", gid)
tcc := dtmcli.NewTcc(examples.DtmServer)
req := examples.GenTransReq(30, outFailed, inFailed)
tcc.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutConfirm", examples.Busi+"/TransOutRevert", &req)
tcc.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInConfirm", examples.Busi+"/TransInRevert", &req)
tcc.Gid = gid
return tcc
}
func transQuery(t *testing.T, gid string) { func transQuery(t *testing.T, gid string) {
resp, err := common.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query") resp, err := common.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query")
e2p(err) e2p(err)

View File

@ -5,6 +5,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
) )
@ -19,19 +20,36 @@ func TccMain() {
} }
func TccSetup(app *gin.Engine) { func TccSetup(app *gin.Engine) {
app.POST(BusiApi+"/TransInTcc", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromReq(c)
if err != nil {
return nil, err
}
req := reqFrom(c)
logrus.Printf("Trans in %f here, and Trans in another %f in call2 ", req.Amount/2, req.Amount/2)
_, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
if rerr != nil {
return nil, rerr
}
return M{"result": "SUCCESS"}, nil
}))
} }
func TccFireRequest() { func TccFireRequest() {
logrus.Printf("tcc transaction begin") logrus.Printf("tcc transaction begin")
req := &TransReq{ _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) {
Amount: 30, res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
TransInResult: "SUCCESS", if rerr != nil {
TransOutResult: "SUCCESS", return
} }
tcc := dtmcli.NewTcc(DtmServer). res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTcc", Busi+"/TransInConfirm", Busi+"/TransInRevert")
Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req). if rerr != nil {
Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req) return
logrus.Printf("tcc trans submit") }
err := tcc.Submit() logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String())
return
})
e2p(err) e2p(err)
} }