diff --git a/conf.sample.yml b/conf.sample.yml index fcb73fb..c3b8b7a 100644 --- a/conf.sample.yml +++ b/conf.sample.yml @@ -4,5 +4,4 @@ Mysql: password: 'my-secret-pw' port: '3306' -PreparedExpire: 90 # 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel TransCronInterval: 10 # 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮重试处理,包括prepared中的任务和commited的任务 diff --git a/dtmcli/message.go b/dtmcli/message.go index d1d4aea..75307f1 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -27,6 +27,7 @@ type MsgStep struct { func NewMsg(server string) *Msg { return &Msg{ MsgData: MsgData{ + Gid: common.GenGid(), TransType: "msg", }, Server: server, @@ -65,6 +66,5 @@ func (s *Msg) Prepare(queryPrepared string) error { if resp.StatusCode() != 200 { return fmt.Errorf("prepare failed: %v", resp.Body()) } - s.Gid = jsonitor.Get(resp.Body(), "gid").ToString() return nil } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 6ca0218..daed4f3 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -52,7 +52,7 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can return common.RestyClient.R(). SetBody(&M{ "gid": t.Gid, - "branch": common.GenGid(), + "branch_id": common.GenGid(), "trans_type": "tcc", "status": "prepared", "data": string(common.MustMarshal(body)), @@ -60,5 +60,5 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can "confirm": confirmUrl, "cancel": cancelUrl, }). - Post(t.Dtm + "/registerXaBranch") + Post(t.Dtm + "/registerTccBranch") } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 034c528..8b6a689 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -33,9 +33,9 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback e2p(err) app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { type CallbackReq struct { - Gid string `json:"gid"` - Branch string `json:"branch"` - Action string `json:"action"` + Gid string `json:"gid"` + BranchID string `json:"branch_id"` + Action string `json:"action"` } req := CallbackReq{} b, err := c.GetRawData() @@ -44,9 +44,9 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback tx, my := common.DbAlone(xa.Conf) defer my.Close() if req.Action == "commit" { - tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch)) + tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.BranchID)) } else if req.Action == "rollback" { - tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.Branch)) + tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.BranchID)) } else { panic(fmt.Errorf("unknown action: %s", req.Action)) } @@ -57,21 +57,21 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { defer common.P2E(&rerr) - branch := common.GenGid() + branchID := common.GenGid() tx, my := common.DbAlone(xa.Conf) defer func() { my.Close() }() - tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch)) + tx.Must().Exec(fmt.Sprintf("XA start '%s'", branchID)) err := transFunc(tx) e2p(err) resp, err := common.RestyClient.R(). - SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). + SetBody(&M{"gid": gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). Post(xa.Server + "/registerXaBranch") e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { e2p(fmt.Errorf("unknown server response: %s", resp.String())) } - tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch)) - tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch)) + tx.Must().Exec(fmt.Sprintf("XA end '%s'", branchID)) + tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branchID)) return nil } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 1ad9453..e128984 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -39,10 +39,9 @@ func Abort(c *gin.Context) (interface{}, error) { db := dbGet() m := TransFromContext(c) m = TransFromDb(db, m.Gid) - if m.TransType != "xa" || m.Status != "prepared" { - return nil, fmt.Errorf("unkown trans data. type: %s status: %s for gid: %s", m.TransType, m.Status, m.Gid) + if m.TransType != "xa" && m.TransType != "tcc" || m.Status != "prepared" { + return nil, fmt.Errorf("unexpected trans data. type: %s status: %s for gid: %s", m.TransType, m.Status, m.Gid) } - // 当前xa trans的状态为prepared,直接处理,则是回滚 go m.Process(db) return M{"message": "SUCCESS"}, nil } @@ -59,6 +58,8 @@ func RegisterXaBranch(c *gin.Context) (interface{}, error) { DoNothing: true, }).Create(branches) e2p(err) + global := TransGlobal{Gid: branch.Gid} + global.touch(db, config.TransCronInterval) return M{"message": "SUCCESS"}, nil } @@ -67,10 +68,10 @@ func RegisterTccBranch(c *gin.Context) (interface{}, error) { err := c.BindJSON(&data) e2p(err) branch := TransBranch{ - Gid: data["gid"], - Branch: data["branch_id"], - Status: data["status"], - Data: data["data"], + Gid: data["gid"], + BranchID: data["branch_id"], + Status: data["status"], + Data: data["data"], } branches := []TransBranch{branch, branch, branch} @@ -83,6 +84,8 @@ func RegisterTccBranch(c *gin.Context) (interface{}, error) { DoNothing: true, }).Create(branches) e2p(err) + global := TransGlobal{Gid: branch.Gid} + global.touch(dbGet(), config.TransCronInterval) return M{"message": "SUCCESS"}, nil } diff --git a/dtmsvr/config.go b/dtmsvr/config.go index 3047f51..8367059 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -1,13 +1,11 @@ package dtmsvr type dtmsvrConfig struct { - PreparedExpire int64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel TransCronInterval int64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 Mysql map[string]string } var config = &dtmsvrConfig{ - PreparedExpire: 60, TransCronInterval: 10, } diff --git a/dtmsvr/dtmsvr.sql b/dtmsvr/dtmsvr.sql index d14deb9..bfdf033 100644 --- a/dtmsvr/dtmsvr.sql +++ b/dtmsvr/dtmsvr.sql @@ -32,7 +32,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` ( `gid` varchar(128) NOT NULL COMMENT '事务全局id', `url` varchar(128) NOT NULL COMMENT '动作关联的url', `data` TEXT COMMENT '请求所携带的数据', - `branch` VARCHAR(128) NOT NULL COMMENT '事务分支名称', + `branch_id` VARCHAR(128) NOT NULL COMMENT '事务分支名称', `branch_type` varchar(45) NOT NULL COMMENT '事务分支类型 saga_action | saga_compensate | xa', `status` varchar(45) NOT NULL COMMENT '步骤的状态 submitted | finished | rollbacked', `finish_time` datetime DEFAULT NULL, @@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS `trans_branch` ( `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), - UNIQUE KEY `gid` (`gid`,`branch`, `branch_type`), + UNIQUE KEY `gid` (`gid`,`branch_id`, `branch_type`), KEY `create_time` (`create_time`), KEY `update_time` (`update_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; @@ -49,7 +49,7 @@ drop table IF EXISTS trans_log; CREATE TABLE IF NOT EXISTS `trans_log` ( `id` int(11) NOT NULL AUTO_INCREMENT, `gid` varchar(128) NOT NULL COMMENT '事务全局id', - `branch` varchar(128) DEFAULT NULL COMMENT '事务分支', + `branch_id` varchar(128) DEFAULT NULL COMMENT '事务分支', `action` varchar(45) DEFAULT NULL COMMENT '行为', `old_status` varchar(45) NOT NULL DEFAULT '' COMMENT '旧状态', `new_status` varchar(45) NOT NULL COMMENT '新状态', diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 5f98155..75d1afb 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -27,7 +27,6 @@ var myinit int = func() int { func TestViper(t *testing.T) { assert.Equal(t, true, viper.Get("mysql") != nil) - assert.Equal(t, int64(90), config.PreparedExpire) } func TestDtmSvr(t *testing.T) { @@ -50,9 +49,9 @@ func TestDtmSvr(t *testing.T) { msgPending(t) msgNormal(t) - sagaNormal(t) tccNormal(t) tccRollback(t) + sagaNormal(t) xaNormal(t) xaRollback(t) sagaCommittedPending(t) @@ -155,7 +154,7 @@ func tccRollback(t *testing.T) { e2p(err) } func msgNormal(t *testing.T) { - msg := genMsg("gid-normal-msg") + msg := genMsg("gid-msg-normal") msg.Submit() assert.Equal(t, "submitted", getTransStatus(msg.Gid)) WaitTransProcessed(msg.Gid) @@ -164,7 +163,7 @@ func msgNormal(t *testing.T) { } func msgPending(t *testing.T) { - msg := genMsg("gid-normal-pending") + msg := genMsg("gid-msg-normal-pending") msg.Prepare("") assert.Equal(t, "prepared", getTransStatus(msg.Gid)) examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index cce5a48..4eee811 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -65,7 +65,7 @@ type TransBranch struct { Gid string Url string Data string - Branch string + BranchID string `json:"branch_id"` BranchType string Status string FinishTime *time.Time @@ -77,7 +77,7 @@ func (*TransBranch) TableName() string { } func (t *TransBranch) changeStatus(db *common.DB, status string) *gorm.DB { - writeTransLog(t.Gid, "branch change", status, t.Branch, "") + writeTransLog(t.Gid, "branch change", status, t.BranchID, "") dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ "status": status, "finish_time": time.Now(), diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index b2ad62a..91388e5 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -3,9 +3,7 @@ package dtmsvr import ( "fmt" "strings" - "time" - "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -24,7 +22,7 @@ func (t *TransMsgProcessor) GenBranches() []TransBranch { for _, step := range steps { branches = append(branches, TransBranch{ Gid: t.Gid, - Branch: common.GenGid(), + BranchID: common.GenGid(), Data: step["data"].(string), Url: step["action"].(string), BranchType: "action", @@ -53,17 +51,10 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) { resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) e2p(err) body := resp.String() - if strings.Contains(body, "FAIL") { - preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) - logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local()) - status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string) - if status != t.Status { - t.changeStatus(db, status) - } else { - t.touch(db, t.NextCronInterval*2) - } - } else if strings.Contains(body, "SUCCESS") { + if strings.Contains(body, "SUCCESS") { t.changeStatus(db, "submitted") + } else { + t.touch(db, t.NextCronInterval*2) } } diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index e8de14b..ac69e86 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -24,7 +24,7 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch { for _, branchType := range []string{"compensate", "action"} { branches = append(branches, TransBranch{ Gid: t.Gid, - Branch: branch, + BranchID: branch, Data: step["data"].(string), Url: step[branchType].(string), BranchType: branchType, diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 5445885..0f0e27c 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -16,23 +16,7 @@ func init() { } func (t *TransTccProcessor) GenBranches() []TransBranch { - branches := []TransBranch{} - steps := []M{} - common.MustUnmarshalString(t.Data, &steps) - for _, step := range steps { - branch := common.GenGid() - for _, branchType := range []string{"cancel", "confirm", "try"} { - branches = append(branches, TransBranch{ - Gid: t.Gid, - Branch: branch, - Data: step["data"].(string), - Url: step[branchType].(string), - BranchType: branchType, - Status: "prepared", - }) - } - } - return branches + return []TransBranch{} } func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { @@ -51,33 +35,15 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - if t.Status != "submitted" { + if t.Status == "succeed" || t.Status == "failed" { return } - current := 0 // 当前正在处理的步骤 - // 先处理一轮正常try状态 - for ; current < len(branches); current++ { - branch := &branches[current] - if branch.BranchType != "try" || branch.Status == "succeed" { - continue - } - if branch.BranchType == "try" && branch.Status == "prepared" { - t.ExecBranch(db, branch) - if branch.Status != "succeed" { - break - } - } else { - break + branchType := common.If(t.Status == "submitted", "confirm", "cancel").(string) + for current := len(branches) - 1; current >= -1; current-- { + if current == -1 { // 已全部处理完 + t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string)) + } else if branches[current].BranchType == branchType { + t.ExecBranch(db, &branches[current]) } } - // 如果try全部成功,则处理confirm分支,否则处理cancel分支 - currentType := common.If(current == len(branches), "confirm", "cancel") - for current--; current >= 0; current-- { - branch := &branches[current] - if branch.BranchType != currentType || branch.Status != "prepared" { - continue - } - t.ExecBranch(db, branch) - } - t.changeStatus(db, common.If(currentType == "confirm", "succeed", "failed").(string)) } diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index b0ccbfc..ed7d279 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -20,9 +20,9 @@ func (t *TransXaProcessor) GenBranches() []TransBranch { } func (t *TransXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetBody(M{ - "branch": branch.Branch, - "action": common.If(t.Status == "prepared", "rollback", "commit"), - "gid": branch.Gid, + "branch_id": branch.BranchID, + "action": common.If(t.Status == "prepared", "rollback", "commit"), + "gid": branch.Gid, }).Post(branch.Url) e2p(err) body := resp.String() diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index 92ce212..ad2b90a 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -14,6 +14,7 @@ func dbGet() *common.DB { return common.DbGet(config.Mysql) } func writeTransLog(gid string, action string, status string, branch string, detail string) { + return db := dbGet() if detail == "" { detail = "{}" diff --git a/examples/main_tcc.go b/examples/main_tcc.go index b75c1f7..711a716 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -26,7 +26,7 @@ func TccSetup(app *gin.Engine) { return nil, err } req := reqFrom(c) - logrus.Printf("Trans in %f here, and Trans in another %f in call2 ", req.Amount/2, req.Amount/2) + logrus.Printf("Trans in %d here, and Trans in another %d in call2 ", req.Amount/2, req.Amount/2) _, rerr := tcc.CallBranch(&TransReq{Amount: req.Amount / 2}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") if rerr != nil { return nil, rerr