From 8a72a4d629ed541e3a45a8dc9b88c4d5af7f2a9e Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 27 May 2021 19:01:04 +0800 Subject: [PATCH] saga use save status ok --- dtmsvr/trans.go | 38 ++++++-------------------------------- dtmsvr/types.go | 11 +++++++++++ 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 966e343..07f0905 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -48,7 +48,6 @@ func (t *TransSaga) GetDataBranches() []TransBranchModel { } func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { - gid := t.Gid current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] @@ -62,21 +61,11 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er } body := resp.String() - db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + t.touch(db.Must()) if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step finished", "finished", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) + step.saveStatus(db.Must(), "finished") } else if strings.Contains(body, "FAIL") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) + step.saveStatus(db.Must(), "rollbacked") break } else { return fmt.Errorf("unknown response: %s, will be retried", body) @@ -84,12 +73,7 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er } } if current == len(branches) { // saga 事务完成 - writeTransLog(gid, "saga finished", "finished", "", "") - dbr := db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ - "status": "finished", - "finish_time": time.Now(), - }) - checkAffected(dbr) + t.saveStatus(db.Must(), "finished") return nil } for current = current - 1; current >= 0; current-- { @@ -103,12 +87,7 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er } body := resp.String() if strings.Contains(body, "SUCCESS") { - writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") - dbr := db.Must().Model(&step).Where("status=?", step.Status).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) + step.saveStatus(db.Must(), "rollbacked") } else { return fmt.Errorf("expect compensate return SUCCESS") } @@ -116,12 +95,7 @@ func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) er if current != -1 { return fmt.Errorf("saga current not -1") } - writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") - dbr := db.Must().Model(&TransGlobalModel{}).Where("status=? and gid=?", "committed", gid).Updates(M{ - "status": "rollbacked", - "rollback_time": time.Now(), - }) - checkAffected(dbr) + t.saveStatus(db.Must(), "rollbacked") return nil } diff --git a/dtmsvr/types.go b/dtmsvr/types.go index 606a886..e87fe3a 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -31,6 +31,17 @@ func (t *TransGlobalModel) touch(db *common.MyDb) *gorm.DB { return db.Model(&TransGlobalModel{}).Where("gid=?", t.Gid).Update("gid", t.Gid) // 更新update_time,避免被定时任务再次 } +func (t *TransGlobalModel) saveStatus(db *common.MyDb, status string) *gorm.DB { + writeTransLog(t.Gid, "step change", status, "", "") + dbr := db.Must().Model(t).Where("status=?", t.Status).Updates(M{ + "status": status, + "finish_time": time.Now(), + }) + checkAffected(dbr) + t.Status = status + return dbr +} + type TransBranchModel struct { common.ModelBase Gid string