diff --git a/dtmsvr/config.go b/dtmsvr/config.go index 85df1c8..3047f51 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -1,14 +1,14 @@ package dtmsvr type dtmsvrConfig struct { - PreparedExpire int64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel - JobCronInterval int64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 - Mysql map[string]string + PreparedExpire int64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel + TransCronInterval int64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 + Mysql map[string]string } var config = &dtmsvrConfig{ - PreparedExpire: 60, - JobCronInterval: 20, + PreparedExpire: 60, + TransCronInterval: 10, } var dbName = "dtm" diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 119cf62..5214f46 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -11,56 +11,55 @@ import ( func CronPrepared() { for { - CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared") + CronTransOnce(time.Duration(0), "prepared") sleepCronTime() } } -func CronTransOnce(expire time.Duration, status string) bool { +func CronTransOnce(expireIn time.Duration, status string) bool { defer handlePanic() - trans := lockOneTrans(expire, status) + trans := lockOneTrans(expireIn, status) if trans == nil { return false } - trans.touch(dbGet()) - defer func() { - WaitTransProcessed(trans.Gid) - }() + defer WaitTransProcessed(trans.Gid) trans.Process(dbGet()) return true } func CronCommitted() { for { - notEmpty := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd") + notEmpty := CronTransOnce(time.Duration(0), "commitetd") if !notEmpty { sleepCronTime() } } } -func lockOneTrans(expire time.Duration, status string) *TransGlobal { +func lockOneTrans(expireIn time.Duration, status string) *TransGlobal { trans := TransGlobal{} owner := common.GenGid() db := dbGet() dbr := db.Must().Model(&trans). - Where("update_time < date_sub(now(), interval ? second) and status=?", int(expire/time.Second), status). + Where("next_cron_time < date_add(now(), interval ? second) and status=?", int(expireIn/time.Second), status). Limit(1).Update("owner", owner) if dbr.RowsAffected == 0 { return nil } dbr = db.Must().Where("owner=?", owner).Find(&trans) + updates := trans.setNextCron(trans.NextCronInterval * 2) // 下次被cron的间隔加倍 + db.Must().Model(&trans).Select(updates).Updates(&trans) return &trans } func handlePanic() { if err := recover(); err != nil { - logrus.Printf("----panic %s handlered", err.(error).Error()) + logrus.Errorf("----panic %s handlered", err.(error).Error()) } } func sleepCronTime() { - delta := math.Min(3, float64(config.JobCronInterval)) - interval := time.Duration((float64(config.JobCronInterval) - rand.Float64()*delta) * float64(time.Second)) + delta := math.Min(3, float64(config.TransCronInterval)) + interval := time.Duration((float64(config.TransCronInterval) - rand.Float64()*delta) * float64(time.Second)) time.Sleep(interval) } diff --git a/dtmsvr/dtmsvr.sql b/dtmsvr/dtmsvr.sql index dd711e0..ff8d091 100644 --- a/dtmsvr/dtmsvr.sql +++ b/dtmsvr/dtmsvr.sql @@ -15,12 +15,15 @@ CREATE TABLE if not EXISTS `trans_global` ( `commit_time` datetime DEFAULT NULL, `finish_time` datetime DEFAULT NULL, `rollback_time` datetime DEFAULT NULL, + `next_cron_interval` int(11) default null comment '下次定时处理的间隔', + `next_cron_time` datetime default null comment '下次定时处理的时间', `owner` varchar(128) not null default '' comment '正在处理全局事务的锁定者', PRIMARY KEY (`id`), UNIQUE KEY `gid` (`gid`), key `owner`(`owner`), KEY `create_time` (`create_time`), - KEY `update_time` (`update_time`) + KEY `update_time` (`update_time`), + key `next_cron_time` (`next_cron_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; drop table IF EXISTS trans_branch; diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 9837eef..4f3f1b5 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -152,7 +152,7 @@ func tccRollbackPending(t *testing.T) { WaitTransProcessed(tcc.Gid) assert.Equal(t, "committed", getTransStatus(tcc.Gid)) examples.TccTransInCancelResult = "" - CronTransOnce(-10*time.Second, "committed") + CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) } @@ -170,14 +170,14 @@ func msgPending(t *testing.T) { msg.Prepare("") assert.Equal(t, "prepared", getTransStatus(msg.Gid)) examples.MsgTransQueryResult = "PENDING" - CronTransOnce(-10*time.Second, "prepared") + CronTransOnce(60*time.Second, "prepared") assert.Equal(t, "prepared", getTransStatus(msg.Gid)) examples.MsgTransQueryResult = "" examples.MsgTransInResult = "PENDING" - CronTransOnce(-10*time.Second, "prepared") + CronTransOnce(60*time.Second, "prepared") assert.Equal(t, "committed", getTransStatus(msg.Gid)) examples.MsgTransInResult = "" - CronTransOnce(-10*time.Second, "committed") + CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) assert.Equal(t, "succeed", getTransStatus(msg.Gid)) } @@ -207,7 +207,7 @@ func sagaPrepareCancel(t *testing.T) { saga.Prepare(saga.QueryPrepared) examples.SagaTransQueryResult = "FAIL" config.PreparedExpire = -10 - CronTransOnce(-10*time.Second, "prepared") + CronTransOnce(60*time.Second, "prepared") examples.SagaTransQueryResult = "" config.PreparedExpire = 60 assert.Equal(t, "canceled", getTransStatus(saga.Gid)) @@ -217,10 +217,10 @@ func sagaPreparePending(t *testing.T) { saga := genSaga("gid1-preparePending", false, false) saga.Prepare(saga.QueryPrepared) examples.SagaTransQueryResult = "PENDING" - CronTransOnce(-10*time.Second, "prepared") + CronTransOnce(60*time.Second, "prepared") examples.SagaTransQueryResult = "" assert.Equal(t, "prepared", getTransStatus(saga.Gid)) - CronTransOnce(-10*time.Second, "prepared") + CronTransOnce(60*time.Second, "prepared") assert.Equal(t, "succeed", getTransStatus(saga.Gid)) } @@ -232,7 +232,7 @@ func sagaCommittedPending(t *testing.T) { WaitTransProcessed(saga.Gid) examples.SagaTransInResult = "" assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) - CronTransOnce(-10*time.Second, "committed") + CronTransOnce(60*time.Second, "committed") assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, "succeed", getTransStatus(saga.Gid)) } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 3563152..105a02b 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -14,14 +14,16 @@ import ( type TransGlobal struct { common.ModelBase - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Data string `json:"data"` - Status string `json:"status"` - QueryPrepared string `json:"query_prepared"` - CommitTime *time.Time - FinishTime *time.Time - RollbackTime *time.Time + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Data string `json:"data"` + Status string `json:"status"` + QueryPrepared string `json:"query_prepared"` + CommitTime *time.Time + FinishTime *time.Time + RollbackTime *time.Time + NextCronInterval int64 + NextCronTime *time.Time } func (*TransGlobal) TableName() string { @@ -34,24 +36,28 @@ type TransProcessor interface { ExecBranch(db *common.DB, branch *TransBranch) } -func (t *TransGlobal) touch(db *common.DB) *gorm.DB { +func (t *TransGlobal) touch(db *common.DB, interval int64) *gorm.DB { writeTransLog(t.Gid, "touch trans", "", "", "") - return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time,避免被定时任务再次 + updates := t.setNextCron(interval) + return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Select(updates).Updates(t) } func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB { writeTransLog(t.Gid, "change status", status, "", "") - updates := M{ - "status": status, - } - if status == "succeed" { - updates["finish_time"] = time.Now() - } else if status == "failed" { - updates["rollback_time"] = time.Now() - } - dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates) - checkAffected(dbr) + old := t.Status t.Status = status + updates := t.setNextCron(config.TransCronInterval) + updates = append(updates, "status") + now := time.Now() + if status == "succeed" { + t.FinishTime = &now + updates = append(updates, "finish_time") + } else if status == "failed" { + t.RollbackTime = &now + updates = append(updates, "rollback_time") + } + dbr := db.Must().Model(t).Where("status=?", old).Select(updates).Updates(t) + checkAffected(dbr) return dbr } @@ -114,7 +120,7 @@ func (t *TransGlobal) MayQueryPrepared(db *common.DB) { if status != t.Status { t.changeStatus(db, status) } else { - t.touch(db) + t.touch(db, t.NextCronInterval*2) } } else if strings.Contains(body, "SUCCESS") { t.changeStatus(db, "committed") @@ -133,16 +139,23 @@ func (trans *TransGlobal) Process(db *common.DB) { trans.getProcessor().ProcessOnce(db, branches) } +func (t *TransGlobal) setNextCron(expireIn int64) []string { + t.NextCronInterval = expireIn + next := time.Now().Add(time.Duration(config.TransCronInterval) * time.Second) + t.NextCronTime = &next + return []string{"next_cron_interval", "next_cron_time"} +} + func (t *TransGlobal) SaveNew(db *common.DB) { err := db.Transaction(func(db1 *gorm.DB) error { db := &common.DB{DB: db1} - + updates := t.setNextCron(config.TransCronInterval) writeTransLog(t.Gid, "create trans", t.Status, "", t.Data) dbr := db.Must().Clauses(clause.OnConflict{ DoNothing: true, }).Create(t) if dbr.RowsAffected == 0 && t.Status == "committed" { // 如果数据库已经存放了prepared的事务,则修改状态 - dbr = db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, "prepared").Update("status", t.Status) + dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t) } if dbr.RowsAffected == 0 { // 未保存任何数据,直接返回 return nil diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index bd91201..be82917 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -36,9 +36,9 @@ func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) e2p(err) body := resp.String() - t.touch(db) if strings.Contains(body, "SUCCESS") { branch.changeStatus(db, "succeed") + t.touch(db, config.TransCronInterval) } else { panic(fmt.Errorf("unknown response: %s, will be retried", body)) } diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 90f621b..3ef83ed 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -38,10 +38,11 @@ func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) e2p(err) body := resp.String() - t.touch(db) if strings.Contains(body, "SUCCESS") { + t.touch(db, config.TransCronInterval) branch.changeStatus(db, "succeed") } else if branch.BranchType == "action" && strings.Contains(body, "FAIL") { + t.touch(db, config.TransCronInterval) branch.changeStatus(db, "failed") } else { panic(fmt.Errorf("unknown response: %s, will be retried", body)) diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index b6f5cb1..afd95a8 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -38,10 +38,11 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) e2p(err) body := resp.String() - t.touch(db) if strings.Contains(body, "SUCCESS") { + t.touch(db, config.TransCronInterval) branch.changeStatus(db, "succeed") } else if branch.BranchType == "try" && strings.Contains(body, "FAIL") { + t.touch(db, config.TransCronInterval) branch.changeStatus(db, "failed") } else { panic(fmt.Errorf("unknown response: %s, will be retried", body)) diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 3dd7c45..99c7a5f 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -26,8 +26,8 @@ func (t *TransXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { }).Post(branch.Url) e2p(err) body := resp.String() - t.touch(db) if strings.Contains(body, "SUCCESS") { + t.touch(db, config.TransCronInterval) branch.changeStatus(db, "succeed") } else { panic(fmt.Errorf("bad response: %s", body))