dtm/dtmsvr/cron.go
2021-08-11 17:31:28 +08:00

70 lines
2.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dtmsvr
import (
"fmt"
"math"
"math/rand"
"runtime/debug"
"time"
"github.com/yedf/dtm/dtmcli"
)
// CronTransOnce cron expired trans. use expireIn as expire time
func CronTransOnce(expireIn time.Duration) bool {
defer handlePanic(nil)
trans := lockOneTrans(expireIn)
if trans == nil {
return false
}
if TransProcessedTestChan != nil {
defer WaitTransProcessed(trans.Gid)
}
trans.Process(dbGet(), true)
return true
}
// CronExpiredTrans cron expired trans, num == -1 indicate for ever
func CronExpiredTrans(num int) {
for i := 0; i < num || num == -1; i++ {
hasTrans := CronTransOnce(time.Duration(0))
if !hasTrans && num != 1 {
sleepCronTime(0)
}
}
}
func lockOneTrans(expireIn time.Duration) *TransGlobal {
trans := TransGlobal{}
owner := GenGid()
db := dbGet()
// 这里next_cron_time需要限定范围否则数据量累计之后会导致查询变慢
// 限定update_time < now - 3否则会出现刚被这个应用取出又被另一个取出
dbr := db.Must().Model(&trans).
Where("next_cron_time < date_add(now(), interval ? second) and next_cron_time > date_add(now(), interval -3600 second) and update_time < date_add(now(), interval ? second) and status in ('prepared', 'aborting', 'submitted')", int(expireIn/time.Second), -3+int(expireIn/time.Second)).
Limit(1).Update("owner", owner)
if dbr.RowsAffected == 0 {
return nil
}
dbr = db.Must().Where("owner=?", owner).Find(&trans)
updates := trans.setNextCron(trans.NextCronInterval * 2) // 下次被cron的间隔加倍
db.Must().Model(&trans).Select(updates).Updates(&trans)
return &trans
}
func handlePanic(perr *error) {
if err := recover(); err != nil {
dtmcli.LogRedf("----recovered panic %v\n%s", err, string(debug.Stack()))
if perr != nil {
*perr = fmt.Errorf("dtm panic: %v", err)
}
}
}
func sleepCronTime(milli int) {
delta := math.Min(3, float64(config.TransCronInterval))
interval := time.Duration((float64(config.TransCronInterval) - rand.Float64()*delta) * float64(time.Second))
dtmcli.Logf("sleeping for %v pass in %d milli", interval, milli)
time.Sleep(dtmcli.If(milli == 0, interval, time.Duration(milli*int(time.Millisecond))).(time.Duration))
}