From 9f8ee90998866501849f6b415052bfd7fb3f2889 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 27 May 2021 22:19:20 +0800 Subject: [PATCH] rename --- dtmsvr/api.go | 8 +++--- dtmsvr/config.go | 8 ++++-- dtmsvr/cron.go | 58 ++++++++++++++++++++++++++++---------- dtmsvr/dtmsvr_test.go | 12 ++++---- dtmsvr/service.go | 20 +++++-------- dtmsvr/trans.go | 65 ++++++++++++++++++------------------------- dtmsvr/types.go | 27 ++++++++++++------ 7 files changed, 111 insertions(+), 87 deletions(-) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index df9a0d0..a826622 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -34,7 +34,7 @@ func Commit(c *gin.Context) (interface{}, error) { func Rollback(c *gin.Context) (interface{}, error) { m := getTransFromContext(c) - trans := TransGlobalModel{} + trans := TransGlobal{} dbGet().Must().Model(&m).First(&trans) // 当前xa trans的状态为prepared,直接处理,则是回滚 go ProcessTrans(&trans) @@ -42,7 +42,7 @@ func Rollback(c *gin.Context) (interface{}, error) { } func Branch(c *gin.Context) (interface{}, error) { - branch := TransBranchModel{} + branch := TransBranch{} err := c.BindJSON(&branch) e2p(err) db := dbGet() @@ -52,7 +52,7 @@ func Branch(c *gin.Context) (interface{}, error) { return M{"message": "SUCCESS"}, nil } -func getTransFromContext(c *gin.Context) *TransGlobalModel { +func getTransFromContext(c *gin.Context) *TransGlobal { data := M{} b, err := c.GetRawData() e2p(err) @@ -61,7 +61,7 @@ func getTransFromContext(c *gin.Context) *TransGlobalModel { if data["trans_type"].(string) == "saga" { data["data"] = common.MustMarshalString(data["steps"]) } - m := TransGlobalModel{} + m := TransGlobal{} common.MustRemarshal(data, &m) return &m } diff --git a/dtmsvr/config.go b/dtmsvr/config.go index 0420f7d..01fdda4 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -1,10 +1,12 @@ package dtmsvr type dtmsvrConfig struct { - PreparedExpire int64 `json:"prepare_expire"` // 单位秒,当prepared的状态超过该时间,才能够转变成canceled,避免cancel了之后,才进入prepared - Mysql map[string]string + PreparedExpire int64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel + JobCronInterval int64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 + Mysql map[string]string } var config = &dtmsvrConfig{ - PreparedExpire: 60, + PreparedExpire: 60, + JobCronInterval: 20, } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 5a032a2..207860a 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -2,6 +2,8 @@ package dtmsvr import ( "fmt" + "math" + "math/rand" "strings" "time" @@ -11,8 +13,8 @@ import ( func CronPreparedOnce(expire time.Duration) { db := dbGet() - ss := []TransGlobalModel{} - db.Must().Model(&TransGlobalModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) + ss := []TransGlobal{} + db.Must().Model(&TransGlobal{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) writeTransLog("", "saga fetch prepared", fmt.Sprint(len(ss)), "", "") if len(ss) == 0 { return @@ -39,34 +41,60 @@ func CronPreparedOnce(expire time.Duration) { func CronPrepared() { for { defer handlePanic() - CronPreparedOnce(10 * time.Second) + CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared") + sleepCronTime() } } -func CronCommittedOnce(expire time.Duration) { +func CronTransOnce(expire time.Duration, status string) bool { + trans := lockOneTrans(expire, status) + if trans == nil { + return false + } + trans.touch(dbGet()) + branches := []TransBranch{} db := dbGet() - ss := []TransGlobalModel{} - db.Must().Model(&TransGlobalModel{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "committed").Find(&ss) - writeTransLog("", "saga fetch committed", fmt.Sprint(len(ss)), "", "") - if len(ss) == 0 { - return - } - for _, sm := range ss { - writeTransLog(sm.Gid, "saga touch committed", "", "", "") - db.Must().Model(&sm).Update("id", sm.ID) - ProcessTrans(&sm) + db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) + trans.getProcessor().ProcessOnce(db, branches) + if TransProcessedTestChan != nil { + TransProcessedTestChan <- trans.Gid } + return true } func CronCommitted() { for { defer handlePanic() - CronCommittedOnce(10 * time.Second) + processed := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd") + if !processed { + sleepCronTime() + } } } +func lockOneTrans(expire time.Duration, status string) *TransGlobal { + trans := TransGlobal{} + owner := common.GenGid() + db := dbGet() + dbr := db.Must().Model(&trans). + Where("update_time < date_sub(now(), interval ? second) and satus=?", int(expire/time.Second), status). + Limit(1).Update("owner", owner) + if dbr.RowsAffected == 0 { + return nil + } + dbr = db.Must().Where("owner=?", owner).Find(&trans) + return &trans +} + func handlePanic() { if err := recover(); err != nil { logrus.Printf("----panic %s handlered", err.(error).Error()) + time.Sleep(3 * time.Second) // 出错后睡眠3s,避免无限循环 } } + +func sleepCronTime() { + delta := math.Min(3, float64(config.JobCronInterval)) + interval := time.Duration(rand.Float64() * delta * float64(time.Second)) + time.Sleep(interval) +} diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 7a3ba5f..137acca 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -50,7 +50,7 @@ func TestCover(t *testing.T) { db := dbGet() db.NoMust() CronPreparedOnce(0) - CronCommittedOnce(0) + CronTransOnce(0, "committed") defer handlePanic() checkAffected(db.DB) } @@ -58,16 +58,16 @@ func TestCover(t *testing.T) { // 测试使用的全局对象 var initdb = dbGet() -func getSagaModel(gid string) *TransGlobalModel { - sm := TransGlobalModel{} +func getSagaModel(gid string) *TransGlobal { + sm := TransGlobal{} dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) e2p(dbr.Error) return &sm } func getBranchesStatus(gid string) []string { - steps := []TransBranchModel{} - dbr := dbGet().Model(&TransBranchModel{}).Where("gid=?", gid).Find(&steps) + steps := []TransBranch{} + dbr := dbGet().Model(&TransBranch{}).Where("gid=?", gid).Find(&steps) e2p(dbr.Error) status := []string{} for _, step := range steps { @@ -172,7 +172,7 @@ func sagaCommittedPending(t *testing.T) { WaitTransProcessed(saga.Gid) examples.TransInResult = "" assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) - CronCommittedOnce(-10 * time.Second) + CronTransOnce(-10*time.Second, "committed") WaitTransProcessed(saga.Gid) assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) diff --git a/dtmsvr/service.go b/dtmsvr/service.go index a2e4332..df659eb 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -7,7 +7,7 @@ import ( "gorm.io/gorm/clause" ) -func saveCommitted(m *TransGlobalModel) { +func saveCommitted(m *TransGlobal) { db := dbGet() m.Status = "committed" err := db.Transaction(func(db1 *gorm.DB) error { @@ -20,7 +20,7 @@ func saveCommitted(m *TransGlobalModel) { writeTransLog(m.Gid, "change status", m.Status, "", "") db.Must().Model(m).Where("status=?", "prepared").Update("status", "committed") } - nsteps := GetTrans(m).GetDataBranches() + nsteps := m.getProcessor().GenBranches() if len(nsteps) > 0 { writeTransLog(m.Gid, "save steps", m.Status, "", common.MustMarshalString(nsteps)) db.Must().Clauses(clause.OnConflict{ @@ -42,18 +42,12 @@ func WaitTransProcessed(gid string) { } } -func ProcessTrans(trans *TransGlobalModel) { - err := innerProcessTrans(trans) - if err != nil { - logrus.Errorf("process trans ignore error: %s", err.Error()) - } +func ProcessTrans(trans *TransGlobal) { + branches := []TransBranch{} + db := dbGet() + db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) + trans.getProcessor().ProcessOnce(db, branches) if TransProcessedTestChan != nil { TransProcessedTestChan <- trans.Gid } } -func innerProcessTrans(trans *TransGlobalModel) (rerr error) { - branches := []TransBranchModel{} - db := dbGet() - db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) - return GetTrans(trans).ProcessOnce(db, branches) -} diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 07f0905..9cb9b64 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -8,33 +8,22 @@ import ( "github.com/yedf/dtm/common" ) -type Trans interface { - GetDataBranches() []TransBranchModel - ProcessOnce(db *common.MyDb, branches []TransBranchModel) error +type TransProcessor interface { + GenBranches() []TransBranch + ProcessOnce(db *common.MyDb, branches []TransBranch) error } -func GetTrans(trans *TransGlobalModel) Trans { - if trans.TransType == "saga" { - return &TransSaga{TransGlobalModel: trans} - } else if trans.TransType == "tcc" { - return &TransTcc{TransGlobalModel: trans} - } else if trans.TransType == "xa" { - return &TransXa{TransGlobalModel: trans} - } - return nil +type TransSagaProcessor struct { + *TransGlobal } -type TransSaga struct { - *TransGlobalModel -} - -func (t *TransSaga) GetDataBranches() []TransBranchModel { - nsteps := []TransBranchModel{} +func (t *TransSagaProcessor) GenBranches() []TransBranch { + nsteps := []TransBranch{} steps := []M{} common.MustUnmarshalString(t.Data, &steps) for _, step := range steps { for _, branchType := range []string{"compensate", "action"} { - nsteps = append(nsteps, TransBranchModel{ + nsteps = append(nsteps, TransBranch{ Gid: t.Gid, Branch: fmt.Sprintf("%d", len(nsteps)+1), Data: step["data"].(string), @@ -47,7 +36,7 @@ func (t *TransSaga) GetDataBranches() []TransBranchModel { return nsteps } -func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { +func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] @@ -99,17 +88,17 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er return nil } -type TransTcc struct { - *TransGlobalModel +type TransTccProcessor struct { + *TransGlobal } -func (t *TransTcc) GetDataBranches() []TransBranchModel { - nsteps := []TransBranchModel{} +func (t *TransTccProcessor) GenBranches() []TransBranch { + nsteps := []TransBranch{} steps := []M{} common.MustUnmarshalString(t.Data, &steps) for _, step := range steps { for _, branchType := range []string{"rollback", "commit", "prepare"} { - nsteps = append(nsteps, TransBranchModel{ + nsteps = append(nsteps, TransBranch{ Gid: t.Gid, Branch: fmt.Sprintf("%d", len(nsteps)+1), Data: step["data"].(string), @@ -122,7 +111,7 @@ func (t *TransTcc) GetDataBranches() []TransBranchModel { return nsteps } -func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { +func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { gid := t.Gid current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { @@ -136,7 +125,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err return err } body := resp.String() - db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 if strings.Contains(body, "SUCCESS") { writeTransLog(gid, "step finished", "finished", step.Branch, "") dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ @@ -160,7 +149,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err ////////////////////////////////////////////////// if current == len(branches) { // tcc 事务完成 writeTransLog(gid, "saga finished", "finished", "", "") - dbr := db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ + dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ "status": "finished", "finish_time": time.Now(), }) @@ -192,7 +181,7 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err return fmt.Errorf("saga current not -1") } writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") - dbr := db.Must().Model(&TransGlobalModel{}).Where("status=? and gid=?", "committed", gid).Updates(M{ + dbr := db.Must().Model(&TransGlobal{}).Where("status=? and gid=?", "committed", gid).Updates(M{ "status": "rollbacked", "rollback_time": time.Now(), }) @@ -200,15 +189,15 @@ func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) err return nil } -type TransXa struct { - *TransGlobalModel +type TransXaProcessor struct { + *TransGlobal } -func (t *TransXa) GetDataBranches() []TransBranchModel { - return []TransBranchModel{} +func (t *TransXaProcessor) GenBranches() []TransBranch { + return []TransBranch{} } -func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { +func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { gid := t.Gid if t.Status == "finished" { return nil @@ -218,7 +207,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro if branch.Status == "finished" { continue } - db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 resp, err := common.RestyClient.R().SetBody(M{ "branch": branch.Branch, "action": "commit", @@ -238,7 +227,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro }) } writeTransLog(gid, "xa finished", "finished", "", "") - db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ + db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ "status": "finished", "finish_time": time.Now(), }) @@ -247,7 +236,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro if branch.Status == "rollbacked" { continue } - db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 resp, err := common.RestyClient.R().SetBody(M{ "branch": branch.Branch, "action": "rollback", @@ -267,7 +256,7 @@ func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) erro }) } writeTransLog(gid, "xa rollbacked", "rollbacked", "", "") - db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ + db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ "status": "rollbacked", "finish_time": time.Now(), }) diff --git a/dtmsvr/types.go b/dtmsvr/types.go index 8d50d4c..dad515b 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -13,7 +13,7 @@ type M = map[string]interface{} var p2e = common.P2E var e2p = common.E2P -type TransGlobalModel struct { +type TransGlobal struct { common.ModelBase Gid string `json:"gid"` TransType string `json:"trans_type"` @@ -25,16 +25,16 @@ type TransGlobalModel struct { RollbackTime *time.Time } -func (*TransGlobalModel) TableName() string { +func (*TransGlobal) TableName() string { return "trans_global" } -func (t *TransGlobalModel) touch(db *common.MyDb) *gorm.DB { +func (t *TransGlobal) touch(db *common.MyDb) *gorm.DB { writeTransLog(t.Gid, "touch trans", "", "", "") - return db.Model(&TransGlobalModel{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time,避免被定时任务再次 + return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time,避免被定时任务再次 } -func (t *TransGlobalModel) saveStatus(db *common.MyDb, status string) *gorm.DB { +func (t *TransGlobal) saveStatus(db *common.MyDb, status string) *gorm.DB { writeTransLog(t.Gid, "step change", status, "", "") dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ "status": status, @@ -45,7 +45,7 @@ func (t *TransGlobalModel) saveStatus(db *common.MyDb, status string) *gorm.DB { return dbr } -type TransBranchModel struct { +type TransBranch struct { common.ModelBase Gid string Url string @@ -57,11 +57,11 @@ type TransBranchModel struct { RollbackTime *time.Time } -func (*TransBranchModel) TableName() string { +func (*TransBranch) TableName() string { return "trans_branch" } -func (t *TransBranchModel) saveStatus(db *common.MyDb, status string) *gorm.DB { +func (t *TransBranch) saveStatus(db *common.MyDb, status string) *gorm.DB { writeTransLog(t.Gid, "step change", status, t.Branch, "") dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ "status": status, @@ -77,3 +77,14 @@ func checkAffected(db1 *gorm.DB) { panic(fmt.Errorf("duplicate updating")) } } + +func (trans *TransGlobal) getProcessor() TransProcessor { + if trans.TransType == "saga" { + return &TransSagaProcessor{TransGlobal: trans} + } else if trans.TransType == "tcc" { + return &TransTccProcessor{TransGlobal: trans} + } else if trans.TransType == "xa" { + return &TransXaProcessor{TransGlobal: trans} + } + return nil +}