From 291cf0bb28f91152c57be37ccaed24ff8d75926b Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 27 May 2021 23:25:28 +0800 Subject: [PATCH] test pass --- dtmsvr/trans.go | 85 +++++++------------------------------------------ dtmsvr/types.go | 17 ++++++---- 2 files changed, 22 insertions(+), 80 deletions(-) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 089f51d..c682f9e 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -3,7 +3,6 @@ package dtmsvr import ( "fmt" "strings" - "time" "github.com/yedf/dtm/common" ) @@ -62,7 +61,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch } } if current == len(branches) { // saga 事务完成 - t.saveStatus(db.Must(), "finished") + t.changeStatus(db.Must(), "finished") return nil } for current = current - 1; current >= 0; current-- { @@ -84,7 +83,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch if current != -1 { return fmt.Errorf("saga current not -1") } - t.saveStatus(db.Must(), "rollbacked") + t.changeStatus(db.Must(), "rollbacked") return nil } @@ -112,7 +111,6 @@ func (t *TransTccProcessor) GenBranches() []TransBranch { } func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { - gid := t.Gid current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] @@ -125,21 +123,11 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) return err } body := resp.String() - db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + t.touch(db) if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step finished", "finished", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) + step.changeStatus(db, "finished") } else if strings.Contains(body, "FAIL") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) + step.changeStatus(db, "rollbacked") break } else { return fmt.Errorf("unknown response: %s, will be retried", body) @@ -148,44 +136,9 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) } ////////////////////////////////////////////////// if current == len(branches) { // tcc 事务完成 - writeTransLog(gid, "saga finished", "finished", "", "") - dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) + t.changeStatus(db, "finished") return nil } - for current = current - 1; current >= 0; current-- { - step := branches[current] - if step.BranchType != "compensate" || step.Status != "prepared" { - continue - } - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } - body := resp.String() - if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", step.Status).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) - } else { - return fmt.Errorf("expect compensate return SUCCESS") - } - } - if current != -1 { - return fmt.Errorf("saga current not -1") - } - writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") - dbr := db.Must().Model(&TransGlobal{}).Where("status=? and gid=?", "committed", gid).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) return nil } @@ -207,7 +160,7 @@ func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) if branch.Status == "finished" { continue } - db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + t.touch(db) // 更新update_time,避免被定时任务再次 resp, err := common.RestyClient.R().SetBody(M{ "branch": branch.Branch, "action": "commit", @@ -220,17 +173,9 @@ func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) if !strings.Contains(body, "SUCCESS") { return fmt.Errorf("bad response: %s", body) } - writeTransLog(gid, "step finished", "finished", branch.Branch, "") - db.Must().Model(&branch).Where("status=?", "prepared").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) + branch.changeStatus(db, "finished") } - writeTransLog(gid, "xa finished", "finished", "", "") - db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "committed").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) + t.changeStatus(db, "finished") } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 for _, branch := range branches { if branch.Status == "rollbacked" { @@ -249,17 +194,9 @@ func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) if !strings.Contains(body, "SUCCESS") { return fmt.Errorf("bad response: %s", body) } - writeTransLog(gid, "step rollbacked", "rollbacked", branch.Branch, "") - db.Must().Model(&branch).Where("status=?", "prepared").Updates(M{ - "status": "rollbacked", - "finish_time": time.Now(), - }) + branch.changeStatus(db, "rollbacked") } - writeTransLog(gid, "xa rollbacked", "rollbacked", "", "") - db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ - "status": "rollbacked", - "finish_time": time.Now(), - }) + t.changeStatus(db, "rollbacked") } else { return fmt.Errorf("bad trans status: %s", t.Status) } diff --git a/dtmsvr/types.go b/dtmsvr/types.go index e80af74..3fd09e8 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -37,12 +37,17 @@ func (t *TransGlobal) touch(db *common.MyDb) *gorm.DB { return db.Model(&TransGlobal{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time,避免被定时任务再次 } -func (t *TransGlobal) saveStatus(db *common.MyDb, status string) *gorm.DB { - writeTransLog(t.Gid, "step change", status, "", "") - dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ - "status": status, - "finish_time": time.Now(), - }) +func (t *TransGlobal) changeStatus(db *common.MyDb, status string) *gorm.DB { + writeTransLog(t.Gid, "change status", status, "", "") + updates := M{ + "status": status, + } + if status == "finished" { + updates["finish_time"] = time.Now() + } else if status == "rollbacked" { + updates["rollback_time"] = time.Now() + } + dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(updates) checkAffected(dbr) t.Status = status return dbr