From 8865ed713602d7b885c0bc928164a32af9a4499f Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 27 May 2021 18:54:18 +0800 Subject: [PATCH] before use touch --- dtmsvr/trans.go | 304 ++++++++++++++++++++++++++++++++++++++++++++++++ tcc.go | 77 ++++++++++++ 2 files changed, 381 insertions(+) create mode 100644 dtmsvr/trans.go create mode 100644 tcc.go diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go new file mode 100644 index 0000000..966e343 --- /dev/null +++ b/dtmsvr/trans.go @@ -0,0 +1,304 @@ +package dtmsvr + +import ( + "fmt" + "strings" + "time" + + "github.com/yedf/dtm/common" +) + +type Trans interface { + GetDataBranches() []TransBranchModel + ProcessOnce(db *common.MyDb, branches []TransBranchModel) error +} + +func GetTrans(trans *TransGlobalModel) Trans { + if trans.TransType == "saga" { + return &TransSaga{TransGlobalModel: trans} + } else if trans.TransType == "tcc" { + return &TransTcc{TransGlobalModel: trans} + } else if trans.TransType == "xa" { + return &TransXa{TransGlobalModel: trans} + } + return nil +} + +type TransSaga struct { + *TransGlobalModel +} + +func (t *TransSaga) GetDataBranches() []TransBranchModel { + nsteps := []TransBranchModel{} + steps := []M{} + common.MustUnmarshalString(t.Data, &steps) + for _, step := range steps { + for _, branchType := range []string{"compensate", "action"} { + nsteps = append(nsteps, TransBranchModel{ + Gid: t.Gid, + Branch: fmt.Sprintf("%d", len(nsteps)+1), + Data: step["data"].(string), + Url: step[branchType].(string), + BranchType: branchType, + Status: "prepared", + }) + } + } + return nsteps +} + +func (t *TransSaga) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { + gid := t.Gid + current := 0 // 当前正在处理的步骤 + for ; current < len(branches); current++ { + step := branches[current] + if step.BranchType == "compensate" && step.Status == "prepared" || step.BranchType == "action" && step.Status == "finished" { + continue + } + if step.BranchType == "action" && step.Status == "prepared" { + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + + db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step finished", "finished", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + } else if strings.Contains(body, "FAIL") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + break + } else { + return fmt.Errorf("unknown response: %s, will be retried", body) + } + } + } + if current == len(branches) { // saga 事务完成 + writeTransLog(gid, "saga finished", "finished", "", "") + dbr := db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + return nil + } + for current = current - 1; current >= 0; current-- { + step := branches[current] + if step.BranchType != "compensate" || step.Status != "prepared" { + continue + } + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", step.Status).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + } else { + return fmt.Errorf("expect compensate return SUCCESS") + } + } + if current != -1 { + return fmt.Errorf("saga current not -1") + } + writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") + dbr := db.Must().Model(&TransGlobalModel{}).Where("status=? and gid=?", "committed", gid).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + return nil +} + +type TransTcc struct { + *TransGlobalModel +} + +func (t *TransTcc) GetDataBranches() []TransBranchModel { + nsteps := []TransBranchModel{} + steps := []M{} + common.MustUnmarshalString(t.Data, &steps) + for _, step := range steps { + for _, branchType := range []string{"rollback", "commit", "prepare"} { + nsteps = append(nsteps, TransBranchModel{ + Gid: t.Gid, + Branch: fmt.Sprintf("%d", len(nsteps)+1), + Data: step["data"].(string), + Url: step[branchType].(string), + BranchType: branchType, + Status: "prepared", + }) + } + } + return nsteps +} + +func (t *TransTcc) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { + gid := t.Gid + current := 0 // 当前正在处理的步骤 + for ; current < len(branches); current++ { + step := branches[current] + if step.BranchType == "prepare" && step.Status == "finished" || step.BranchType != "commit" && step.Status == "prepared" { + continue + } + if step.BranchType == "prepare" && step.Status == "prepared" { + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step finished", "finished", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + } else if strings.Contains(body, "FAIL") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", "prepared").Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + break + } else { + return fmt.Errorf("unknown response: %s, will be retried", body) + } + } + } + ////////////////////////////////////////////////// + if current == len(branches) { // tcc 事务完成 + writeTransLog(gid, "saga finished", "finished", "", "") + dbr := db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + checkAffected(dbr) + return nil + } + for current = current - 1; current >= 0; current-- { + step := branches[current] + if step.BranchType != "compensate" || step.Status != "prepared" { + continue + } + resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + if err != nil { + return err + } + body := resp.String() + if strings.Contains(body, "SUCCESS") { + writeTransLog(gid, "step rollbacked", "rollbacked", step.Branch, "") + dbr := db.Must().Model(&step).Where("status=?", step.Status).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + } else { + return fmt.Errorf("expect compensate return SUCCESS") + } + } + if current != -1 { + return fmt.Errorf("saga current not -1") + } + writeTransLog(gid, "saga rollbacked", "rollbacked", "", "") + dbr := db.Must().Model(&TransGlobalModel{}).Where("status=? and gid=?", "committed", gid).Updates(M{ + "status": "rollbacked", + "rollback_time": time.Now(), + }) + checkAffected(dbr) + return nil +} + +type TransXa struct { + *TransGlobalModel +} + +func (t *TransXa) GetDataBranches() []TransBranchModel { + return []TransBranchModel{} +} + +func (t *TransXa) ProcessOnce(db *common.MyDb, branches []TransBranchModel) error { + gid := t.Gid + if t.Status == "finished" { + return nil + } + if t.Status == "committed" { + for _, branch := range branches { + if branch.Status == "finished" { + continue + } + db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + resp, err := common.RestyClient.R().SetBody(M{ + "branch": branch.Branch, + "action": "commit", + "gid": branch.Gid, + }).Post(branch.Url) + if err != nil { + return err + } + body := resp.String() + if !strings.Contains(body, "SUCCESS") { + return fmt.Errorf("bad response: %s", body) + } + writeTransLog(gid, "step finished", "finished", branch.Branch, "") + db.Must().Model(&branch).Where("status=?", "prepared").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + } + writeTransLog(gid, "xa finished", "finished", "", "") + db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "committed").Updates(M{ + "status": "finished", + "finish_time": time.Now(), + }) + } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 + for _, branch := range branches { + if branch.Status == "rollbacked" { + continue + } + db.Must().Model(&TransGlobalModel{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 + resp, err := common.RestyClient.R().SetBody(M{ + "branch": branch.Branch, + "action": "rollback", + "gid": branch.Gid, + }).Post(branch.Url) + if err != nil { + return err + } + body := resp.String() + if !strings.Contains(body, "SUCCESS") { + return fmt.Errorf("bad response: %s", body) + } + writeTransLog(gid, "step rollbacked", "rollbacked", branch.Branch, "") + db.Must().Model(&branch).Where("status=?", "prepared").Updates(M{ + "status": "rollbacked", + "finish_time": time.Now(), + }) + } + writeTransLog(gid, "xa rollbacked", "rollbacked", "", "") + db.Must().Model(&TransGlobalModel{}).Where("gid=? and status=?", gid, "prepared").Updates(M{ + "status": "rollbacked", + "finish_time": time.Now(), + }) + } else { + return fmt.Errorf("bad trans status: %s", t.Status) + } + return nil +} diff --git a/tcc.go b/tcc.go new file mode 100644 index 0000000..94ed8a1 --- /dev/null +++ b/tcc.go @@ -0,0 +1,77 @@ +package dtm + +import ( + "encoding/json" + "fmt" + + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +type Tcc struct { + TccData + Server string +} + +type TccData struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []TccStep `json:"steps"` + QueryPrepared string `json:"query_prepared"` +} +type TccStep struct { + Prepare string `json:"prepare"` + Commit string `json:"commit"` + Rollback string `json:"rollback"` + Data string `json:"data"` +} + +func TccNew(server string, gid string, queryPrepared string) *Saga { + return &Saga{ + SagaData: SagaData{ + Gid: gid, + TransType: "tcc", + QueryPrepared: queryPrepared, + }, + Server: server, + } +} +func (s *Tcc) Add(prepare string, commit string, rollback string, data interface{}) error { + logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, prepare, commit, rollback, data) + d, err := json.Marshal(data) + if err != nil { + return err + } + step := TccStep{ + Prepare: prepare, + Commit: commit, + Rollback: rollback, + Data: string(d), + } + s.Steps = append(s.Steps, step) + return nil +} + +func (s *Tcc) Prepare() error { + logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) + resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("prepare failed: %v", resp.Body()) + } + return nil +} + +func (s *Tcc) Commit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.TccData) + resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("commit failed: %v", resp.Body()) + } + return nil +}