test pass

This commit is contained in:
yedongfu 2021-05-28 11:03:20 +08:00
parent 291cf0bb28
commit 2332d19f24
6 changed files with 88 additions and 105 deletions

View File

@ -39,6 +39,12 @@ func P2E(perr *error) {
} }
} }
func PanicIf(cond bool, err error) {
if cond {
panic(err)
}
}
func GenGid() string { func GenGid() string {
return gNode.Generate().Base58() return gNode.Generate().Base58()
} }

View File

@ -1,73 +1,39 @@
package dtmsvr package dtmsvr
import ( import (
"fmt"
"math" "math"
"math/rand" "math/rand"
"strings"
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
func CronPreparedOnce(expire time.Duration) {
db := dbGet()
ss := []TransGlobal{}
db.Must().Model(&TransGlobal{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss)
writeTransLog("", "saga fetch prepared", fmt.Sprint(len(ss)), "", "")
if len(ss) == 0 {
return
}
for _, sm := range ss {
writeTransLog(sm.Gid, "saga touch prepared", "", "", "")
db.Must().Model(&sm).Update("id", sm.ID)
resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.QueryPrepared)
e2p(err)
body := resp.String()
if strings.Contains(body, "FAIL") {
preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second)
logrus.Printf("create time: %s prepared expire: %s ", sm.CreateTime.Local(), preparedExpire.Local())
status := common.If(sm.CreateTime.Before(preparedExpire), "canceled", "prepared").(string)
writeTransLog(sm.Gid, "saga canceled", status, "", "")
db.Must().Model(&sm).Where("status = ?", "prepared").Update("status", status)
} else if strings.Contains(body, "SUCCESS") {
sm.Status = "committed"
sm.SaveNew(db)
sm.Process(db)
}
}
}
func CronPrepared() { func CronPrepared() {
for { for {
defer handlePanic()
CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared") CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared")
sleepCronTime() sleepCronTime()
} }
} }
func CronTransOnce(expire time.Duration, status string) bool { func CronTransOnce(expire time.Duration, status string) bool {
defer handlePanic()
trans := lockOneTrans(expire, status) trans := lockOneTrans(expire, status)
if trans == nil { if trans == nil {
return false return false
} }
trans.touch(dbGet()) trans.touch(dbGet())
branches := []TransBranch{} defer func() {
db := dbGet() WaitTransProcessed(trans.Gid)
db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) }()
trans.getProcessor().ProcessOnce(db, branches) trans.Process(dbGet())
if TransProcessedTestChan != nil {
TransProcessedTestChan <- trans.Gid
}
return true return true
} }
func CronCommitted() { func CronCommitted() {
for { for {
defer handlePanic() notEmpty := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd")
processed := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd") if !notEmpty {
if !processed {
sleepCronTime() sleepCronTime()
} }
} }
@ -90,7 +56,6 @@ func lockOneTrans(expire time.Duration, status string) *TransGlobal {
func handlePanic() { func handlePanic() {
if err := recover(); err != nil { if err := recover(); err != nil {
logrus.Printf("----panic %s handlered", err.(error).Error()) logrus.Printf("----panic %s handlered", err.(error).Error())
time.Sleep(3 * time.Second) // 出错后睡眠3s避免无限循环
} }
} }

View File

@ -37,11 +37,11 @@ func TestDtmSvr(t *testing.T) {
e2p(dbGet().Exec("truncate trans_log").Error) e2p(dbGet().Exec("truncate trans_log").Error)
examples.ResetXaData() examples.ResetXaData()
sagaCommittedPending(t)
sagaPreparePending(t)
xaRollback(t) xaRollback(t)
xaNormal(t) xaNormal(t)
sagaPreparePending(t)
sagaPrepareCancel(t) sagaPrepareCancel(t)
sagaCommittedPending(t)
sagaNormal(t) sagaNormal(t)
sagaRollback(t) sagaRollback(t)
} }
@ -49,7 +49,7 @@ func TestDtmSvr(t *testing.T) {
func TestCover(t *testing.T) { func TestCover(t *testing.T) {
db := dbGet() db := dbGet()
db.NoMust() db.NoMust()
CronPreparedOnce(0) CronTransOnce(0, "prepared")
CronTransOnce(0, "committed") CronTransOnce(0, "committed")
defer handlePanic() defer handlePanic()
checkAffected(db.DB) checkAffected(db.DB)
@ -146,7 +146,7 @@ func sagaPrepareCancel(t *testing.T) {
saga.Prepare() saga.Prepare()
examples.TransQueryResult = "FAIL" examples.TransQueryResult = "FAIL"
config.PreparedExpire = -10 config.PreparedExpire = -10
CronPreparedOnce(-10 * time.Second) CronTransOnce(-10*time.Second, "prepared")
examples.TransQueryResult = "" examples.TransQueryResult = ""
config.PreparedExpire = 60 config.PreparedExpire = 60
assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status)
@ -156,11 +156,10 @@ func sagaPreparePending(t *testing.T) {
saga := genSaga("gid1-preparePending", false, false) saga := genSaga("gid1-preparePending", false, false)
saga.Prepare() saga.Prepare()
examples.TransQueryResult = "PENDING" examples.TransQueryResult = "PENDING"
CronPreparedOnce(-10 * time.Second) CronTransOnce(-10*time.Second, "prepared")
examples.TransQueryResult = "" examples.TransQueryResult = ""
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
CronPreparedOnce(-10 * time.Second) CronTransOnce(-10*time.Second, "prepared")
WaitTransProcessed(saga.Gid)
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
} }
@ -173,7 +172,6 @@ func sagaCommittedPending(t *testing.T) {
examples.TransInResult = "" examples.TransInResult = ""
assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(-10*time.Second, "committed") CronTransOnce(-10*time.Second, "committed")
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
} }

View File

@ -7,9 +7,11 @@ import (
var TransProcessedTestChan chan string = nil // 用于测试时,通知处理结束 var TransProcessedTestChan chan string = nil // 用于测试时,通知处理结束
func WaitTransProcessed(gid string) { func WaitTransProcessed(gid string) {
logrus.Printf("waiting for gid %s", gid)
id := <-TransProcessedTestChan id := <-TransProcessedTestChan
for id != gid { for id != gid {
logrus.Errorf("-------id %s not match gid %s", id, gid) logrus.Errorf("-------id %s not match gid %s", id, gid)
id = <-TransProcessedTestChan id = <-TransProcessedTestChan
} }
logrus.Printf("finish for gid %s", gid)
} }

View File

@ -3,13 +3,16 @@ package dtmsvr
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
) )
type TransProcessor interface { type TransProcessor interface {
GenBranches() []TransBranch GenBranches() []TransBranch
ProcessOnce(db *common.MyDb, branches []TransBranch) error ProcessOnce(db *common.MyDb, branches []TransBranch)
ExecBranch(db *common.MyDb, branch *TransBranch) string
} }
type TransSagaProcessor struct { type TransSagaProcessor struct {
@ -35,7 +38,30 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch {
return nsteps return nsteps
} }
func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { func (t *TransSagaProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string {
return ""
}
func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
if t.Status == "prepared" {
resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared)
e2p(err)
body := resp.String()
if strings.Contains(body, "FAIL") {
preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second)
logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local())
status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string)
if status != t.Status {
t.changeStatus(db, status)
}
return
} else if strings.Contains(body, "SUCCESS") {
t.Status = "committed"
t.SaveNew(db)
} else {
panic(fmt.Errorf("unknown result, will be retried: %s", body))
}
}
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ { for ; current < len(branches); current++ {
step := branches[current] step := branches[current]
@ -44,9 +70,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
} }
if step.BranchType == "action" && step.Status == "prepared" { if step.BranchType == "action" && step.Status == "prepared" {
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil { e2p(err)
return err
}
body := resp.String() body := resp.String()
t.touch(db.Must()) t.touch(db.Must())
@ -56,13 +80,13 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
step.changeStatus(db.Must(), "rollbacked") step.changeStatus(db.Must(), "rollbacked")
break break
} else { } else {
return fmt.Errorf("unknown response: %s, will be retried", body) panic(fmt.Errorf("unknown response: %s, will be retried", body))
} }
} }
} }
if current == len(branches) { // saga 事务完成 if current == len(branches) { // saga 事务完成
t.changeStatus(db.Must(), "finished") t.changeStatus(db.Must(), "finished")
return nil return
} }
for current = current - 1; current >= 0; current-- { for current = current - 1; current >= 0; current-- {
step := branches[current] step := branches[current]
@ -70,21 +94,18 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch
continue continue
} }
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil { e2p(err)
return err
}
body := resp.String() body := resp.String()
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {
step.changeStatus(db.Must(), "rollbacked") step.changeStatus(db.Must(), "rollbacked")
} else { } else {
return fmt.Errorf("expect compensate return SUCCESS") panic(fmt.Errorf("expect compensate return SUCCESS"))
} }
} }
if current != -1 { if current != -1 {
return fmt.Errorf("saga current not -1") panic(fmt.Errorf("saga current not -1"))
} }
t.changeStatus(db.Must(), "rollbacked") t.changeStatus(db.Must(), "rollbacked")
return nil
} }
type TransTccProcessor struct { type TransTccProcessor struct {
@ -110,7 +131,11 @@ func (t *TransTccProcessor) GenBranches() []TransBranch {
return nsteps return nsteps
} }
func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string {
return ""
}
func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
current := 0 // 当前正在处理的步骤 current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ { for ; current < len(branches); current++ {
step := branches[current] step := branches[current]
@ -119,9 +144,7 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch)
} }
if step.BranchType == "prepare" && step.Status == "prepared" { if step.BranchType == "prepare" && step.Status == "prepared" {
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
if err != nil { e2p(err)
return err
}
body := resp.String() body := resp.String()
t.touch(db) t.touch(db)
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {
@ -130,16 +153,14 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch)
step.changeStatus(db, "rollbacked") step.changeStatus(db, "rollbacked")
break break
} else { } else {
return fmt.Errorf("unknown response: %s, will be retried", body) panic(fmt.Errorf("unknown response: %s, will be retried", body))
} }
} }
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
if current == len(branches) { // tcc 事务完成 if current == len(branches) { // tcc 事务完成
t.changeStatus(db, "finished") t.changeStatus(db, "finished")
return nil
} }
return nil
} }
type TransXaProcessor struct { type TransXaProcessor struct {
@ -149,31 +170,32 @@ type TransXaProcessor struct {
func (t *TransXaProcessor) GenBranches() []TransBranch { func (t *TransXaProcessor) GenBranches() []TransBranch {
return []TransBranch{} return []TransBranch{}
} }
func (t *TransXaProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) string {
resp, err := common.RestyClient.R().SetBody(M{
"branch": branch.Branch,
"action": common.If(t.Status == "prepared", "rollback", "commit"),
"gid": branch.Gid,
}).Post(branch.Url)
e2p(err)
body := resp.String()
if !strings.Contains(body, "SUCCESS") {
panic(fmt.Errorf("bad response: %s", body))
}
branch.changeStatus(db, common.If(t.Status == "prepared", "rollbacked", "finished").(string))
return "SUCCESS"
}
func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
gid := t.Gid
if t.Status == "finished" { if t.Status == "finished" {
return nil return
} }
if t.Status == "committed" { if t.Status == "committed" {
for _, branch := range branches { for _, branch := range branches {
if branch.Status == "finished" { if branch.Status == "finished" {
continue continue
} }
_ = t.ExecBranch(db, &branch)
t.touch(db) // 更新update_time避免被定时任务再次 t.touch(db) // 更新update_time避免被定时任务再次
resp, err := common.RestyClient.R().SetBody(M{
"branch": branch.Branch,
"action": "commit",
"gid": branch.Gid,
}).Post(branch.Url)
if err != nil {
return err
}
body := resp.String()
if !strings.Contains(body, "SUCCESS") {
return fmt.Errorf("bad response: %s", body)
}
branch.changeStatus(db, "finished")
} }
t.changeStatus(db, "finished") t.changeStatus(db, "finished")
} else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景
@ -181,24 +203,11 @@ func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch)
if branch.Status == "rollbacked" { if branch.Status == "rollbacked" {
continue continue
} }
db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time避免被定时任务再次 _ = t.ExecBranch(db, &branch)
resp, err := common.RestyClient.R().SetBody(M{ t.touch(db)
"branch": branch.Branch,
"action": "rollback",
"gid": branch.Gid,
}).Post(branch.Url)
if err != nil {
return err
}
body := resp.String()
if !strings.Contains(body, "SUCCESS") {
return fmt.Errorf("bad response: %s", body)
}
branch.changeStatus(db, "rollbacked")
} }
t.changeStatus(db, "rollbacked") t.changeStatus(db, "rollbacked")
} else { } else {
return fmt.Errorf("bad trans status: %s", t.Status) e2p(fmt.Errorf("bad trans status: %s", t.Status))
} }
return nil
} }

View File

@ -98,12 +98,15 @@ func (trans *TransGlobal) getProcessor() TransProcessor {
} }
func (trans *TransGlobal) Process(db *common.MyDb) { func (trans *TransGlobal) Process(db *common.MyDb) {
branches := []TransBranch{} defer handlePanic()
db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) defer func() {
trans.getProcessor().ProcessOnce(db, branches)
if TransProcessedTestChan != nil { if TransProcessedTestChan != nil {
TransProcessedTestChan <- trans.Gid TransProcessedTestChan <- trans.Gid
} }
}()
branches := []TransBranch{}
db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches)
trans.getProcessor().ProcessOnce(db, branches)
} }
func (t *TransGlobal) SaveNew(db *common.MyDb) { func (t *TransGlobal) SaveNew(db *common.MyDb) {