From 2332d19f24859513a7b3b5656eb6b592cd478f47 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Fri, 28 May 2021 11:03:20 +0800 Subject: [PATCH] test pass --- common/utils.go | 6 +++ dtmsvr/cron.go | 49 +++--------------- dtmsvr/dtmsvr_test.go | 14 +++--- dtmsvr/service.go | 2 + dtmsvr/trans.go | 113 +++++++++++++++++++++++------------------- dtmsvr/types.go | 9 ++-- 6 files changed, 88 insertions(+), 105 deletions(-) diff --git a/common/utils.go b/common/utils.go index f8e24ab..932935f 100644 --- a/common/utils.go +++ b/common/utils.go @@ -39,6 +39,12 @@ func P2E(perr *error) { } } +func PanicIf(cond bool, err error) { + if cond { + panic(err) + } +} + func GenGid() string { return gNode.Generate().Base58() } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 7acbbf5..7a391a8 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -1,73 +1,39 @@ package dtmsvr import ( - "fmt" "math" "math/rand" - "strings" "time" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) -func CronPreparedOnce(expire time.Duration) { - db := dbGet() - ss := []TransGlobal{} - db.Must().Model(&TransGlobal{}).Where("update_time < date_sub(now(), interval ? second)", int(expire/time.Second)).Where("status = ?", "prepared").Find(&ss) - writeTransLog("", "saga fetch prepared", fmt.Sprint(len(ss)), "", "") - if len(ss) == 0 { - return - } - for _, sm := range ss { - writeTransLog(sm.Gid, "saga touch prepared", "", "", "") - db.Must().Model(&sm).Update("id", sm.ID) - resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.QueryPrepared) - e2p(err) - body := resp.String() - if strings.Contains(body, "FAIL") { - preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) - logrus.Printf("create time: %s prepared expire: %s ", sm.CreateTime.Local(), preparedExpire.Local()) - status := common.If(sm.CreateTime.Before(preparedExpire), "canceled", "prepared").(string) - writeTransLog(sm.Gid, "saga canceled", status, "", "") - db.Must().Model(&sm).Where("status = ?", "prepared").Update("status", status) - } else if strings.Contains(body, "SUCCESS") { - sm.Status = "committed" - sm.SaveNew(db) - sm.Process(db) - } - } -} - func CronPrepared() { for { - defer handlePanic() CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "prepared") sleepCronTime() } } func CronTransOnce(expire time.Duration, status string) bool { + defer handlePanic() trans := lockOneTrans(expire, status) if trans == nil { return false } trans.touch(dbGet()) - branches := []TransBranch{} - db := dbGet() - db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) - trans.getProcessor().ProcessOnce(db, branches) - if TransProcessedTestChan != nil { - TransProcessedTestChan <- trans.Gid - } + defer func() { + WaitTransProcessed(trans.Gid) + }() + trans.Process(dbGet()) return true } func CronCommitted() { for { - defer handlePanic() - processed := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd") - if !processed { + notEmpty := CronTransOnce(time.Duration(config.JobCronInterval)*time.Second, "commitetd") + if !notEmpty { sleepCronTime() } } @@ -90,7 +56,6 @@ func lockOneTrans(expire time.Duration, status string) *TransGlobal { func handlePanic() { if err := recover(); err != nil { logrus.Printf("----panic %s handlered", err.(error).Error()) - time.Sleep(3 * time.Second) // 出错后睡眠3s,避免无限循环 } } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 137acca..1d6d07b 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -37,11 +37,11 @@ func TestDtmSvr(t *testing.T) { e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() + sagaCommittedPending(t) + sagaPreparePending(t) xaRollback(t) xaNormal(t) - sagaPreparePending(t) sagaPrepareCancel(t) - sagaCommittedPending(t) sagaNormal(t) sagaRollback(t) } @@ -49,7 +49,7 @@ func TestDtmSvr(t *testing.T) { func TestCover(t *testing.T) { db := dbGet() db.NoMust() - CronPreparedOnce(0) + CronTransOnce(0, "prepared") CronTransOnce(0, "committed") defer handlePanic() checkAffected(db.DB) @@ -146,7 +146,7 @@ func sagaPrepareCancel(t *testing.T) { saga.Prepare() examples.TransQueryResult = "FAIL" config.PreparedExpire = -10 - CronPreparedOnce(-10 * time.Second) + CronTransOnce(-10*time.Second, "prepared") examples.TransQueryResult = "" config.PreparedExpire = 60 assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status) @@ -156,11 +156,10 @@ func sagaPreparePending(t *testing.T) { saga := genSaga("gid1-preparePending", false, false) saga.Prepare() examples.TransQueryResult = "PENDING" - CronPreparedOnce(-10 * time.Second) + CronTransOnce(-10*time.Second, "prepared") examples.TransQueryResult = "" assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status) - CronPreparedOnce(-10 * time.Second) - WaitTransProcessed(saga.Gid) + CronTransOnce(-10*time.Second, "prepared") assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } @@ -173,7 +172,6 @@ func sagaCommittedPending(t *testing.T) { examples.TransInResult = "" assert.Equal(t, []string{"prepared", "finished", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) CronTransOnce(-10*time.Second, "committed") - WaitTransProcessed(saga.Gid) assert.Equal(t, []string{"prepared", "finished", "prepared", "finished"}, getBranchesStatus(saga.Gid)) assert.Equal(t, "finished", getSagaModel(saga.Gid).Status) } diff --git a/dtmsvr/service.go b/dtmsvr/service.go index bbcd652..cb2b85e 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -7,9 +7,11 @@ import ( var TransProcessedTestChan chan string = nil // 用于测试时,通知处理结束 func WaitTransProcessed(gid string) { + logrus.Printf("waiting for gid %s", gid) id := <-TransProcessedTestChan for id != gid { logrus.Errorf("-------id %s not match gid %s", id, gid) id = <-TransProcessedTestChan } + logrus.Printf("finish for gid %s", gid) } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index c682f9e..73a7656 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -3,13 +3,16 @@ package dtmsvr import ( "fmt" "strings" + "time" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) type TransProcessor interface { GenBranches() []TransBranch - ProcessOnce(db *common.MyDb, branches []TransBranch) error + ProcessOnce(db *common.MyDb, branches []TransBranch) + ExecBranch(db *common.MyDb, branch *TransBranch) string } type TransSagaProcessor struct { @@ -35,7 +38,30 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch { return nsteps } -func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { +func (t *TransSagaProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string { + return "" +} + +func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { + if t.Status == "prepared" { + resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) + e2p(err) + body := resp.String() + if strings.Contains(body, "FAIL") { + preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) + logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local()) + status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string) + if status != t.Status { + t.changeStatus(db, status) + } + return + } else if strings.Contains(body, "SUCCESS") { + t.Status = "committed" + t.SaveNew(db) + } else { + panic(fmt.Errorf("unknown result, will be retried: %s", body)) + } + } current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] @@ -44,9 +70,7 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch } if step.BranchType == "action" && step.Status == "prepared" { resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } + e2p(err) body := resp.String() t.touch(db.Must()) @@ -56,13 +80,13 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch step.changeStatus(db.Must(), "rollbacked") break } else { - return fmt.Errorf("unknown response: %s, will be retried", body) + panic(fmt.Errorf("unknown response: %s, will be retried", body)) } } } if current == len(branches) { // saga 事务完成 t.changeStatus(db.Must(), "finished") - return nil + return } for current = current - 1; current >= 0; current-- { step := branches[current] @@ -70,21 +94,18 @@ func (t *TransSagaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch continue } resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } + e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { step.changeStatus(db.Must(), "rollbacked") } else { - return fmt.Errorf("expect compensate return SUCCESS") + panic(fmt.Errorf("expect compensate return SUCCESS")) } } if current != -1 { - return fmt.Errorf("saga current not -1") + panic(fmt.Errorf("saga current not -1")) } t.changeStatus(db.Must(), "rollbacked") - return nil } type TransTccProcessor struct { @@ -110,7 +131,11 @@ func (t *TransTccProcessor) GenBranches() []TransBranch { return nsteps } -func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { +func (t *TransTccProcessor) ExecBranch(db *common.MyDb, branche *TransBranch) string { + return "" +} + +func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { current := 0 // 当前正在处理的步骤 for ; current < len(branches); current++ { step := branches[current] @@ -119,9 +144,7 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) } if step.BranchType == "prepare" && step.Status == "prepared" { resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) - if err != nil { - return err - } + e2p(err) body := resp.String() t.touch(db) if strings.Contains(body, "SUCCESS") { @@ -130,16 +153,14 @@ func (t *TransTccProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) step.changeStatus(db, "rollbacked") break } else { - return fmt.Errorf("unknown response: %s, will be retried", body) + panic(fmt.Errorf("unknown response: %s, will be retried", body)) } } } ////////////////////////////////////////////////// if current == len(branches) { // tcc 事务完成 t.changeStatus(db, "finished") - return nil } - return nil } type TransXaProcessor struct { @@ -149,31 +170,32 @@ type TransXaProcessor struct { func (t *TransXaProcessor) GenBranches() []TransBranch { return []TransBranch{} } +func (t *TransXaProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) string { + resp, err := common.RestyClient.R().SetBody(M{ + "branch": branch.Branch, + "action": common.If(t.Status == "prepared", "rollback", "commit"), + "gid": branch.Gid, + }).Post(branch.Url) + e2p(err) + body := resp.String() + if !strings.Contains(body, "SUCCESS") { + panic(fmt.Errorf("bad response: %s", body)) + } + branch.changeStatus(db, common.If(t.Status == "prepared", "rollbacked", "finished").(string)) + return "SUCCESS" +} -func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) error { - gid := t.Gid +func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { if t.Status == "finished" { - return nil + return } if t.Status == "committed" { for _, branch := range branches { if branch.Status == "finished" { continue } + _ = t.ExecBranch(db, &branch) t.touch(db) // 更新update_time,避免被定时任务再次 - resp, err := common.RestyClient.R().SetBody(M{ - "branch": branch.Branch, - "action": "commit", - "gid": branch.Gid, - }).Post(branch.Url) - if err != nil { - return err - } - body := resp.String() - if !strings.Contains(body, "SUCCESS") { - return fmt.Errorf("bad response: %s", body) - } - branch.changeStatus(db, "finished") } t.changeStatus(db, "finished") } else if t.Status == "prepared" { // 未commit直接处理的情况为回滚场景 @@ -181,24 +203,11 @@ func (t *TransXaProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) if branch.Status == "rollbacked" { continue } - db.Must().Model(&TransGlobal{}).Where("gid=?", gid).Update("gid", gid) // 更新update_time,避免被定时任务再次 - resp, err := common.RestyClient.R().SetBody(M{ - "branch": branch.Branch, - "action": "rollback", - "gid": branch.Gid, - }).Post(branch.Url) - if err != nil { - return err - } - body := resp.String() - if !strings.Contains(body, "SUCCESS") { - return fmt.Errorf("bad response: %s", body) - } - branch.changeStatus(db, "rollbacked") + _ = t.ExecBranch(db, &branch) + t.touch(db) } t.changeStatus(db, "rollbacked") } else { - return fmt.Errorf("bad trans status: %s", t.Status) + e2p(fmt.Errorf("bad trans status: %s", t.Status)) } - return nil } diff --git a/dtmsvr/types.go b/dtmsvr/types.go index 3fd09e8..af23051 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -98,12 +98,15 @@ func (trans *TransGlobal) getProcessor() TransProcessor { } func (trans *TransGlobal) Process(db *common.MyDb) { + defer handlePanic() + defer func() { + if TransProcessedTestChan != nil { + TransProcessedTestChan <- trans.Gid + } + }() branches := []TransBranch{} db.Must().Where("gid=?", trans.Gid).Order("id asc").Find(&branches) trans.getProcessor().ProcessOnce(db, branches) - if TransProcessedTestChan != nil { - TransProcessedTestChan <- trans.Gid - } } func (t *TransGlobal) SaveNew(db *common.MyDb) {