next cron time will double

This commit is contained in:
yedongfu 2021-06-28 15:35:38 +08:00
parent 5a1e20abe9
commit 099242a867
9 changed files with 71 additions and 54 deletions

View File

@ -1,14 +1,14 @@
package dtmsvr
type dtmsvrConfig struct {
PreparedExpire int64 // 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel
JobCronInterval int64 // 单位秒 当事务等待这个时间之后还没有变化则进行一轮处理包括prepared中的任务和commited的任务
Mysql map[string]string
PreparedExpire int64 // 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel
TransCronInterval int64 // 单位秒 当事务等待这个时间之后还没有变化则进行一轮处理包括prepared中的任务和commited的任务
Mysql map[string]string
}
var config = &dtmsvrConfig{
PreparedExpire: 60,
JobCronInterval: 20,
PreparedExpire: 60,
TransCronInterval: 10,
}
var dbName = "dtm"

View File

@ -11,56 +11,55 @@ import (
func CronPrepared() {
for {
CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared")
CronTransOnce(time.Duration(0), "prepared")
sleepCronTime()
}
}
func CronTransOnce(expire time.Duration, status string) bool {
func CronTransOnce(expireIn time.Duration, status string) bool {
defer handlePanic()
trans := lockOneTrans(expire, status)
trans := lockOneTrans(expireIn, status)
if trans == nil {
return false
}
trans.touch(dbGet())
defer func() {
WaitTransProcessed(trans.Gid)
}()
defer WaitTransProcessed(trans.Gid)
trans.Process(dbGet())
return true
}
func CronCommitted() {
for {
notEmpty := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd")
notEmpty := CronTransOnce(time.Duration(0), "commitetd")
if !notEmpty {
sleepCronTime()
}
}
}
func lockOneTrans(expire time.Duration, status string) *TransGlobal {
func lockOneTrans(expireIn time.Duration, status string) *TransGlobal {
trans := TransGlobal{}
owner := common.GenGid()
db := dbGet()
dbr := db.Must().Model(&trans).
Where("update_time < date_sub(now(), interval ? second) and status=?", int(expire/time.Second), status).
Where("next_cron_time < date_add(now(), interval ? second) and status=?", int(expireIn/time.Second), status).
Limit(1).Update("owner", owner)
if dbr.RowsAffected == 0 {
return nil
}
dbr = db.Must().Where("owner=?", owner).Find(&trans)
updates := trans.setNextCron(trans.NextCronInterval * 2) // 下次被cron的间隔加倍
db.Must().Model(&trans).Select(updates).Updates(&trans)
return &trans
}
func handlePanic() {
if err := recover(); err != nil {
logrus.Printf("----panic %s handlered", err.(error).Error())
logrus.Errorf("----panic %s handlered", err.(error).Error())
}
}
func sleepCronTime() {
delta := math.Min(3, float64(config.JobCronInterval))
interval := time.Duration((float64(config.JobCronInterval) - rand.Float64()*delta) * float64(time.Second))
delta := math.Min(3, float64(config.TransCronInterval))
interval := time.Duration((float64(config.TransCronInterval) - rand.Float64()*delta) * float64(time.Second))
time.Sleep(interval)
}

View File

@ -15,12 +15,15 @@ CREATE TABLE if not EXISTS `trans_global` (
`commit_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`next_cron_interval` int(11) default null comment '下次定时处理的间隔',
`next_cron_time` datetime default null comment '下次定时处理的时间',
`owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
KEY `create_time` (`create_time`),
KEY `update_time` (`update_time`)
KEY `update_time` (`update_time`),
key `next_cron_time` (`next_cron_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
drop table IF EXISTS trans_branch;

View File

@ -152,7 +152,7 @@ func tccRollbackPending(t *testing.T) {
WaitTransProcessed(tcc.Gid)
assert.Equal(t, "committed", getTransStatus(tcc.Gid))
examples.TccTransInCancelResult = ""
CronTransOnce(-10*time.Second, "committed")
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
}
@ -170,14 +170,14 @@ func msgPending(t *testing.T) {
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared")
CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = ""
examples.MsgTransInResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared")
CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "committed", getTransStatus(msg.Gid))
examples.MsgTransInResult = ""
CronTransOnce(-10*time.Second, "committed")
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
@ -207,7 +207,7 @@ func sagaPrepareCancel(t *testing.T) {
saga.Prepare(saga.QueryPrepared)
examples.SagaTransQueryResult = "FAIL"
config.PreparedExpire = -10
CronTransOnce(-10*time.Second, "prepared")
CronTransOnce(60*time.Second, "prepared")
examples.SagaTransQueryResult = ""
config.PreparedExpire = 60
assert.Equal(t, "canceled", getTransStatus(saga.Gid))
@ -217,10 +217,10 @@ func sagaPreparePending(t *testing.T) {
saga := genSaga("gid1-preparePending", false, false)
saga.Prepare(saga.QueryPrepared)
examples.SagaTransQueryResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared")
CronTransOnce(60*time.Second, "prepared")
examples.SagaTransQueryResult = ""
assert.Equal(t, "prepared", getTransStatus(saga.Gid))
CronTransOnce(-10*time.Second, "prepared")
CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
}
@ -232,7 +232,7 @@ func sagaCommittedPending(t *testing.T) {
WaitTransProcessed(saga.Gid)
examples.SagaTransInResult = ""
assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(-10*time.Second, "committed")
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
}

View File

@ -14,14 +14,16 @@ import (
type TransGlobal struct {
common.ModelBase
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Data string `json:"data"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Data string `json:"data"`
Status string `json:"status"`
QueryPrepared string `json:"query_prepared"`
CommitTime *time.Time
FinishTime *time.Time
RollbackTime *time.Time
NextCronInterval int64
NextCronTime *time.Time
}
func (*TransGlobal) TableName() string {
@ -34,24 +36,28 @@ type TransProcessor interface {
ExecBranch(db *common.DB, branch *TransBranch)
}
func (t *TransGlobal) touch(db *common.DB) *gorm.DB {
func (t *TransGlobal) touch(db *common.DB, interval int64) *gorm.DB {
writeTransLog(t.Gid, "touch trans", "", "", "")
return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time避免被定时任务再次
updates := t.setNextCron(interval)
return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Select(updates).Updates(t)
}
func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
writeTransLog(t.Gid, "change status", status, "", "")
updates := M{
"status": status,
}
if status == "succeed" {
updates["finish_time"] = time.Now()
} else if status == "failed" {
updates["rollback_time"] = time.Now()
}
dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates)
checkAffected(dbr)
old := t.Status
t.Status = status
updates := t.setNextCron(config.TransCronInterval)
updates = append(updates, "status")
now := time.Now()
if status == "succeed" {
t.FinishTime = &now
updates = append(updates, "finish_time")
} else if status == "failed" {
t.RollbackTime = &now
updates = append(updates, "rollback_time")
}
dbr := db.Must().Model(t).Where("status=?", old).Select(updates).Updates(t)
checkAffected(dbr)
return dbr
}
@ -114,7 +120,7 @@ func (t *TransGlobal) MayQueryPrepared(db *common.DB) {
if status != t.Status {
t.changeStatus(db, status)
} else {
t.touch(db)
t.touch(db, t.NextCronInterval*2)
}
} else if strings.Contains(body, "SUCCESS") {
t.changeStatus(db, "committed")
@ -133,16 +139,23 @@ func (trans *TransGlobal) Process(db *common.DB) {
trans.getProcessor().ProcessOnce(db, branches)
}
func (t *TransGlobal) setNextCron(expireIn int64) []string {
t.NextCronInterval = expireIn
next := time.Now().Add(time.Duration(config.TransCronInterval) * time.Second)
t.NextCronTime = &next
return []string{"next_cron_interval", "next_cron_time"}
}
func (t *TransGlobal) SaveNew(db *common.DB) {
err := db.Transaction(func(db1 *gorm.DB) error {
db := &common.DB{DB: db1}
updates := t.setNextCron(config.TransCronInterval)
writeTransLog(t.Gid, "create trans", t.Status, "", t.Data)
dbr := db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(t)
if dbr.RowsAffected == 0 && t.Status == "committed" { // 如果数据库已经存放了prepared的事务则修改状态
dbr = db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, "prepared").Update("status", t.Status)
dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t)
}
if dbr.RowsAffected == 0 { // 未保存任何数据,直接返回
return nil

View File

@ -36,9 +36,9 @@ func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
branch.changeStatus(db, "succeed")
t.touch(db, config.TransCronInterval)
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))
}

View File

@ -38,10 +38,11 @@ func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, "succeed")
} else if branch.BranchType == "action" && strings.Contains(body, "FAIL") {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, "failed")
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))

View File

@ -38,10 +38,11 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, "succeed")
} else if branch.BranchType == "try" && strings.Contains(body, "FAIL") {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, "failed")
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))

View File

@ -26,8 +26,8 @@ func (t *TransXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
}).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, "succeed")
} else {
panic(fmt.Errorf("bad response: %s", body))