add prepare cancel case

This commit is contained in:
yedongfu 2021-05-19 11:29:38 +08:00
parent 694c07fbef
commit 60d8272cd4
5 changed files with 97 additions and 50 deletions

View File

@ -21,22 +21,84 @@ func TestViper(t *testing.T) {
assert.Equal(t, "test_val", viper.GetString("test")) assert.Equal(t, "test_val", viper.GetString("test"))
} }
var myinit int = func() int {
dtmsvr.LoadConfig()
return 0
}()
// 测试使用的全局对象
var rabbit = dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
var queprepared = rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared)
var quecommited = rabbit.QueueNew(dtmsvr.RabbitmqConstCommited)
var db = dtmsvr.DbGet()
func getSagaModel(gid string) *dtmsvr.SagaModel {
sm := dtmsvr.SagaModel{}
dbr := db.Model(&sm).Where("gid=?", gid).First(&sm)
common.PanicIfError(dbr.Error)
return &sm
}
func getSagaStepStatus(gid string) []string {
steps := []dtmsvr.SagaStepModel{}
dbr := db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", gid).Find(&steps)
common.PanicIfError(dbr.Error)
status := []string{}
for _, step := range steps {
status = append(status, step.Status)
}
return status
}
func noramlSaga(t *testing.T) {
saga := genSaga("gid-normal", false, false)
saga.Prepare()
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
saga.Commit()
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
}
func rollbackSaga2(t *testing.T) {
saga := genSaga("gid-rollback2", false, true)
saga.Commit()
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
saga.Prepare()
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status)
assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid))
}
func prepareCancel(t *testing.T) {
saga := genSaga("gid1-trans-cancel", false, true)
saga.Prepare()
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
dtmsvr.CronPreparedOne(-1 * time.Second)
assert.Equal(t, "canceled", getSagaModel(saga.Gid).Status)
}
func preparePending(t *testing.T) {
saga := genSaga("gid1-trans-pending", false, true)
saga.Prepare()
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
dtmsvr.CronPreparedOne(-1 * time.Second)
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
}
func TestDtmSvr(t *testing.T) { func TestDtmSvr(t *testing.T) {
// 清理数据 // 清理数据
rabbit := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
queprepared := rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared)
for i := 0; i < queprepared.Queue.Messages; i++ { for i := 0; i < queprepared.Queue.Messages; i++ {
queprepared.WaitAndHandleOne(func(data M) { queprepared.WaitAndHandleOne(func(data M) {
logrus.Printf("ignoring prepared queue data before test") logrus.Printf("ignoring prepared queue data before test")
}) })
} }
quecommited := rabbit.QueueNew(dtmsvr.RabbitmqConstCommited)
for i := 0; i < quecommited.Queue.Messages; i++ { for i := 0; i < quecommited.Queue.Messages; i++ {
quecommited.WaitAndHandleOne(func(data M) { quecommited.WaitAndHandleOne(func(data M) {
logrus.Printf("ignoring commited queue data before test") logrus.Printf("ignoring commited queue data before test")
}) })
} }
db := dtmsvr.DbGet()
common.PanicIfError(db.Exec("truncate test1.a_saga").Error) common.PanicIfError(db.Exec("truncate test1.a_saga").Error)
common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error) common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error)
@ -45,33 +107,10 @@ func TestDtmSvr(t *testing.T) {
go examples.StartSvr() go examples.StartSvr()
time.Sleep(time.Duration(100 * 1000 * 1000)) time.Sleep(time.Duration(100 * 1000 * 1000))
// 开始第一个正常流程的测试 prepareCancel(t)
saga := genSaga("gid-1", false, false) preparePending(t)
saga.Prepare() noramlSaga(t)
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg) rollbackSaga2(t)
sm := dtmsvr.SagaModel{}
db.Model(&sm).Where("gid=?", saga.Gid).First(&sm)
assert.Equal(t, "prepared", sm.Status)
saga.Commit()
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm)
assert.Equal(t, "finished", sm.Status)
steps := []dtmsvr.SagaStepModel{}
db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps)
assert.Equal(t, true, steps[0].Status == "pending" && steps[2].Status == "pending" && steps[1].Status == "finished" && steps[3].Status == "finished")
saga = genSaga("gid-2", false, true)
saga.Commit()
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
saga.Prepare()
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
sm = dtmsvr.SagaModel{}
db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm)
assert.Equal(t, "rollbacked", sm.Status)
steps = []dtmsvr.SagaStepModel{}
db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps)
assert.Equal(t, true, steps[0].Status == "rollbacked" && steps[2].Status == "rollbacked" && steps[1].Status == "finished" && steps[3].Status == "rollbacked")
// assert.Equal(t, 1, 0) // assert.Equal(t, 1, 0)
// 开始测试 // 开始测试
@ -81,7 +120,7 @@ func TestDtmSvr(t *testing.T) {
} }
func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga {
saga := dtm.SagaNew(examples.TcServer, gid, examples.BusiApi+"/TransQuery") saga := dtm.SagaNew(examples.TcServer, gid, examples.Busi+"/TransQuery")
req := examples.TransReq{ req := examples.TransReq{
Amount: 30, Amount: 30,
TransInFailed: inFailed, TransInFailed: inFailed,

View File

@ -1,6 +1,7 @@
package dtmsvr package dtmsvr
import ( import (
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -20,7 +21,7 @@ type SagaModel struct {
ModelBase ModelBase
Gid string Gid string
Steps string Steps string
TransQuery string TransQuery string `json:"trans_query"`
Status string Status string
FinishTime time.Time FinishTime time.Time
RollbackTime time.Time RollbackTime time.Time
@ -59,17 +60,11 @@ func HandlePreparedMsg(data M) {
}).Create(&m) }).Create(&m)
} }
func HandleCommitedMsg(data M) { func handleCommitedSagaModel(m *SagaModel) {
db := DbGet() db := DbGet()
logrus.Printf("creating saga model in commited")
steps := data["steps"].([]interface{})
data["steps"] = common.MustMarshalString(data["steps"])
m := SagaModel{}
err := common.Map2Obj(data, &m)
common.PanicIfError(err)
m.Status = "processing" m.Status = "processing"
stepInserted := false stepInserted := false
err = db.Transaction(func(db *gorm.DB) error { err := db.Transaction(func(db *gorm.DB) error {
db.Clauses(clause.OnConflict{ db.Clauses(clause.OnConflict{
DoNothing: true, DoNothing: true,
}).Create(&m) }).Create(&m)
@ -77,8 +72,10 @@ func HandleCommitedMsg(data M) {
db.Model(&m).Where("status=?", "prepared").Update("status", "processing") db.Model(&m).Where("status=?", "prepared").Update("status", "processing")
} }
nsteps := []SagaStepModel{} nsteps := []SagaStepModel{}
for _, step1 := range steps { steps := []M{}
step := step1.(map[string]interface{}) err := json.Unmarshal([]byte(m.Steps), &steps)
common.PanicIfError(err)
for _, step := range steps {
nsteps = append(nsteps, SagaStepModel{ nsteps = append(nsteps, SagaStepModel{
Gid: m.Gid, Gid: m.Gid,
Step: len(nsteps) + 1, Step: len(nsteps) + 1,
@ -117,6 +114,14 @@ func HandleCommitedMsg(data M) {
logrus.Printf("---------------handle commited msmg error: %s", err.Error()) logrus.Printf("---------------handle commited msmg error: %s", err.Error())
} }
} }
func HandleCommitedMsg(data M) {
logrus.Printf("creating saga model in commited")
data["steps"] = common.MustMarshalString(data["steps"])
m := SagaModel{}
err := common.Map2Obj(data, &m)
common.PanicIfError(err)
handleCommitedSagaModel(&m)
}
func ProcessCommitedSaga(gid string) (rerr error) { func ProcessCommitedSaga(gid string) (rerr error) {
steps := []SagaStepModel{} steps := []SagaStepModel{}

View File

@ -29,7 +29,7 @@ func DbGet() *gorm.DB {
LoadConfig() LoadConfig()
if db == nil { if db == nil {
conf := viper.GetStringMapString("mysql") conf := viper.GetStringMapString("mysql")
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"])
logrus.Printf("connecting %s", strings.Replace(dsn, conf["password"], "****", 1)) logrus.Printf("connecting %s", strings.Replace(dsn, conf["password"], "****", 1))
db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
SkipDefaultTransaction: true, SkipDefaultTransaction: true,

View File

@ -21,7 +21,7 @@ func StartSvr() {
app.POST(BusiApi+"/TransInCompensate", TransInCompensate) app.POST(BusiApi+"/TransInCompensate", TransInCompensate)
app.POST(BusiApi+"/TransOut", TransOut) app.POST(BusiApi+"/TransOut", TransOut)
app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate) app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate)
app.POST(BusiApi+"/TransQuery", TransQuery) app.GET(BusiApi+"/TransQuery", TransQuery)
logrus.Printf("examples istening at %d", BusiPort) logrus.Printf("examples istening at %d", BusiPort)
app.Run(":8081") app.Run(":8081")
} }

View File

@ -2,6 +2,7 @@ package examples
import ( import (
"fmt" "fmt"
"strings"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -67,13 +68,15 @@ func TransOutCompensate(c *gin.Context) {
func TransQuery(c *gin.Context) { func TransQuery(c *gin.Context) {
gid := c.Query("gid") gid := c.Query("gid")
req := TransReq{} logrus.Printf("%s TransQuery", gid)
if err := c.BindJSON(&req); err != nil { if strings.Contains(gid, "cancel") {
return c.JSON(200, M{"result": "FAIL"})
} } else if strings.Contains(gid, "pending") {
logrus.Printf("%s TransQuery: %v", gid, req) c.JSON(200, M{"result": "PENDING"})
} else {
c.JSON(200, M{"result": "SUCCESS"}) c.JSON(200, M{"result": "SUCCESS"})
} }
}
func trans(req *TransReq) { func trans(req *TransReq) {
// gid := common.GenGid() // gid := common.GenGid()