test passed

This commit is contained in:
yedongfu 2021-07-04 09:36:16 +08:00
parent 7b43124235
commit 09a97e7e20
15 changed files with 49 additions and 92 deletions

View File

@ -4,5 +4,4 @@ Mysql:
password: 'my-secret-pw'
port: '3306'
PreparedExpire: 90 # 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel
TransCronInterval: 10 # 单位秒 当事务等待这个时间之后还没有变化则进行一轮重试处理包括prepared中的任务和commited的任务

View File

@ -27,6 +27,7 @@ type MsgStep struct {
func NewMsg(server string) *Msg {
return &Msg{
MsgData: MsgData{
Gid: common.GenGid(),
TransType: "msg",
},
Server: server,
@ -65,6 +66,5 @@ func (s *Msg) Prepare(queryPrepared string) error {
if resp.StatusCode() != 200 {
return fmt.Errorf("prepare failed: %v", resp.Body())
}
s.Gid = jsonitor.Get(resp.Body(), "gid").ToString()
return nil
}

View File

@ -52,7 +52,7 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can
return common.RestyClient.R().
SetBody(&M{
"gid": t.Gid,
"branch": common.GenGid(),
"branch_id": common.GenGid(),
"trans_type": "tcc",
"status": "prepared",
"data": string(common.MustMarshal(body)),
@ -60,5 +60,5 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can
"confirm": confirmUrl,
"cancel": cancelUrl,
}).
Post(t.Dtm + "/registerXaBranch")
Post(t.Dtm + "/registerTccBranch")
}

View File

@ -34,7 +34,7 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
type CallbackReq struct {
Gid string `json:"gid"`
Branch string `json:"branch"`
BranchID string `json:"branch_id"`
Action string `json:"action"`
}
req := CallbackReq{}
@ -44,9 +44,9 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
tx, my := common.DbAlone(xa.Conf)
defer my.Close()
if req.Action == "commit" {
tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch))
tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.BranchID))
} else if req.Action == "rollback" {
tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.Branch))
tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.BranchID))
} else {
panic(fmt.Errorf("unknown action: %s", req.Action))
}
@ -57,21 +57,21 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) {
defer common.P2E(&rerr)
branch := common.GenGid()
branchID := common.GenGid()
tx, my := common.DbAlone(xa.Conf)
defer func() { my.Close() }()
tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch))
tx.Must().Exec(fmt.Sprintf("XA start '%s'", branchID))
err := transFunc(tx)
e2p(err)
resp, err := common.RestyClient.R().
SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}).
SetBody(&M{"gid": gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}).
Post(xa.Server + "/registerXaBranch")
e2p(err)
if !strings.Contains(resp.String(), "SUCCESS") {
e2p(fmt.Errorf("unknown server response: %s", resp.String()))
}
tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch))
tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch))
tx.Must().Exec(fmt.Sprintf("XA end '%s'", branchID))
tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branchID))
return nil
}

View File

@ -39,10 +39,9 @@ func Abort(c *gin.Context) (interface{}, error) {
db := dbGet()
m := TransFromContext(c)
m = TransFromDb(db, m.Gid)
if m.TransType != "xa" || m.Status != "prepared" {
return nil, fmt.Errorf("unkown trans data. type: %s status: %s for gid: %s", m.TransType, m.Status, m.Gid)
if m.TransType != "xa" && m.TransType != "tcc" || m.Status != "prepared" {
return nil, fmt.Errorf("unexpected trans data. type: %s status: %s for gid: %s", m.TransType, m.Status, m.Gid)
}
// 当前xa trans的状态为prepared直接处理则是回滚
go m.Process(db)
return M{"message": "SUCCESS"}, nil
}
@ -59,6 +58,8 @@ func RegisterXaBranch(c *gin.Context) (interface{}, error) {
DoNothing: true,
}).Create(branches)
e2p(err)
global := TransGlobal{Gid: branch.Gid}
global.touch(db, config.TransCronInterval)
return M{"message": "SUCCESS"}, nil
}
@ -68,7 +69,7 @@ func RegisterTccBranch(c *gin.Context) (interface{}, error) {
e2p(err)
branch := TransBranch{
Gid: data["gid"],
Branch: data["branch_id"],
BranchID: data["branch_id"],
Status: data["status"],
Data: data["data"],
}
@ -83,6 +84,8 @@ func RegisterTccBranch(c *gin.Context) (interface{}, error) {
DoNothing: true,
}).Create(branches)
e2p(err)
global := TransGlobal{Gid: branch.Gid}
global.touch(dbGet(), config.TransCronInterval)
return M{"message": "SUCCESS"}, nil
}

View File

@ -1,13 +1,11 @@
package dtmsvr
type dtmsvrConfig struct {
PreparedExpire int64 // 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel
TransCronInterval int64 // 单位秒 当事务等待这个时间之后还没有变化则进行一轮处理包括prepared中的任务和commited的任务
Mysql map[string]string
}
var config = &dtmsvrConfig{
PreparedExpire: 60,
TransCronInterval: 10,
}

View File

@ -32,7 +32,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` (
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`url` varchar(128) NOT NULL COMMENT '动作关联的url',
`data` TEXT COMMENT '请求所携带的数据',
`branch` VARCHAR(128) NOT NULL COMMENT '事务分支名称',
`branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支名称',
`branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa',
`status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked',
`finish_time` datetime DEFAULT NULL,
@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` (
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`,`branch`, `branch_type`),
UNIQUE KEY `gid` (`gid`,`branch_id`, `branch_type`),
KEY `create_time` (`create_time`),
KEY `update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@ -49,7 +49,7 @@ drop table IF EXISTS trans_log;
CREATE TABLE IF NOT EXISTS `trans_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT '事务全局id',
`branch` varchar(128) DEFAULT NULL COMMENT '事务分支',
`branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支',
`action` varchar(45) DEFAULT NULL COMMENT '行为',
`old_status` varchar(45) NOT NULL DEFAULT '' COMMENT '旧状态',
`new_status` varchar(45) NOT NULL COMMENT '新状态',

View File

@ -27,7 +27,6 @@ var myinit int = func() int {
func TestViper(t *testing.T) {
assert.Equal(t, true, viper.Get("mysql") != nil)
assert.Equal(t, int64(90), config.PreparedExpire)
}
func TestDtmSvr(t *testing.T) {
@ -50,9 +49,9 @@ func TestDtmSvr(t *testing.T) {
msgPending(t)
msgNormal(t)
sagaNormal(t)
tccNormal(t)
tccRollback(t)
sagaNormal(t)
xaNormal(t)
xaRollback(t)
sagaCommittedPending(t)
@ -155,7 +154,7 @@ func tccRollback(t *testing.T) {
e2p(err)
}
func msgNormal(t *testing.T) {
msg := genMsg("gid-normal-msg")
msg := genMsg("gid-msg-normal")
msg.Submit()
assert.Equal(t, "submitted", getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid)
@ -164,7 +163,7 @@ func msgNormal(t *testing.T) {
}
func msgPending(t *testing.T) {
msg := genMsg("gid-normal-pending")
msg := genMsg("gid-msg-normal-pending")
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")

View File

@ -65,7 +65,7 @@ type TransBranch struct {
Gid string
Url string
Data string
Branch string
BranchID string `json:"branch_id"`
BranchType string
Status string
FinishTime *time.Time
@ -77,7 +77,7 @@ func (*TransBranch) TableName() string {
}
func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB {
writeTransLog(t.Gid, "branch change", status, t.Branch, "")
writeTransLog(t.Gid, "branch change", status, t.BranchID, "")
dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{
"status": status,
"finish_time": time.Now(),

View File

@ -3,9 +3,7 @@ package dtmsvr
import (
"fmt"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
@ -24,7 +22,7 @@ func (t *TransMsgProcessor) GenBranches() []TransBranch {
for _, step := range steps {
branches = append(branches, TransBranch{
Gid: t.Gid,
Branch: common.GenGid(),
BranchID: common.GenGid(),
Data: step["data"].(string),
Url: step["action"].(string),
BranchType: "action",
@ -53,18 +51,11 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) {
resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared)
e2p(err)
body := resp.String()
if strings.Contains(body, "FAIL") {
preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second)
logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local())
status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string)
if status != t.Status {
t.changeStatus(db, status)
if strings.Contains(body, "SUCCESS") {
t.changeStatus(db, "submitted")
} else {
t.touch(db, t.NextCronInterval*2)
}
} else if strings.Contains(body, "SUCCESS") {
t.changeStatus(db, "submitted")
}
}
func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {

View File

@ -24,7 +24,7 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch {
for _, branchType := range []string{"compensate", "action"} {
branches = append(branches, TransBranch{
Gid: t.Gid,
Branch: branch,
BranchID: branch,
Data: step["data"].(string),
Url: step[branchType].(string),
BranchType: branchType,

View File

@ -16,23 +16,7 @@ func init() {
}
func (t *TransTccProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
steps := []M{}
common.MustUnmarshalString(t.Data, &steps)
for _, step := range steps {
branch := common.GenGid()
for _, branchType := range []string{"cancel", "confirm", "try"} {
branches = append(branches, TransBranch{
Gid: t.Gid,
Branch: branch,
Data: step["data"].(string),
Url: step[branchType].(string),
BranchType: branchType,
Status: "prepared",
})
}
}
return branches
return []TransBranch{}
}
func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
@ -51,33 +35,15 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
}
func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
if t.Status != "submitted" {
if t.Status == "succeed" || t.Status == "failed" {
return
}
current := 0 // 当前正在处理的步骤
// 先处理一轮正常try状态
for ; current < len(branches); current++ {
branch := &branches[current]
if branch.BranchType != "try" || branch.Status == "succeed" {
continue
}
if branch.BranchType == "try" && branch.Status == "prepared" {
t.ExecBranch(db, branch)
if branch.Status != "succeed" {
break
}
} else {
break
branchType := common.If(t.Status == "submitted", "confirm", "cancel").(string)
for current := len(branches) - 1; current >= -1; current-- {
if current == -1 { // 已全部处理完
t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string))
} else if branches[current].BranchType == branchType {
t.ExecBranch(db, &branches[current])
}
}
// 如果try全部成功则处理confirm分支否则处理cancel分支
currentType := common.If(current == len(branches), "confirm", "cancel")
for current--; current >= 0; current-- {
branch := &branches[current]
if branch.BranchType != currentType || branch.Status != "prepared" {
continue
}
t.ExecBranch(db, branch)
}
t.changeStatus(db, common.If(currentType == "confirm", "succeed", "failed").(string))
}

View File

@ -20,7 +20,7 @@ func (t *TransXaProcessor) GenBranches() []TransBranch {
}
func (t *TransXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(M{
"branch": branch.Branch,
"branch_id": branch.BranchID,
"action": common.If(t.Status == "prepared", "rollback", "commit"),
"gid": branch.Gid,
}).Post(branch.Url)

View File

@ -14,6 +14,7 @@ func dbGet() *common.DB {
return common.DbGet(config.Mysql)
}
func writeTransLog(gid string, action string, status string, branch string, detail string) {
return
db := dbGet()
if detail == "" {
detail = "{}"

View File

@ -26,7 +26,7 @@ func TccSetup(app *gin.Engine) {
return nil, err
}
req := reqFrom(c)
logrus.Printf("Trans in %f here, and Trans in another %f in call2 ", req.Amount/2, req.Amount/2)
logrus.Printf("Trans in %d here, and Trans in another %d in call2 ", req.Amount/2, req.Amount/2)
_, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
if rerr != nil {
return nil, rerr