saga barrier seems ok

This commit is contained in:
yedongfu 2021-07-08 22:21:34 +08:00
parent aef42d59aa
commit b0102048a9
10 changed files with 50 additions and 29 deletions

View File

@ -11,36 +11,38 @@ import (
type M = map[string]interface{} type M = map[string]interface{}
func wait() {
time.Sleep(10000 * time.Second)
}
func main() { func main() {
if len(os.Args) > 1 && (os.Args[1] == "quick_start" || os.Args[1] == "qs") { if len(os.Args) > 1 && (os.Args[1] == "quick_start" || os.Args[1] == "qs") {
dtmsvr.PopulateMysql() dtmsvr.PopulateMysql()
dtmsvr.Main() dtmsvr.MainStart()
examples.StartMain() examples.StartMain()
for { wait()
time.Sleep(1000 * time.Second)
}
} }
app := examples.BaseAppNew() app := examples.BaseAppNew()
examples.BaseAppSetup(app) examples.BaseAppSetup(app)
if len(os.Args) == 1 || os.Args[1] == "saga" { // 默认情况下展示saga例子 if len(os.Args) == 1 || os.Args[1] == "saga" { // 默认情况下展示saga例子
dtmsvr.PopulateMysql() dtmsvr.PopulateMysql()
dtmsvr.Main() dtmsvr.MainStart()
examples.SagaSetup(app) examples.SagaSetup(app)
examples.BaseAppStart(app) examples.BaseAppStart(app)
examples.SagaFireRequest() examples.SagaFireRequest()
} else if os.Args[1] == "xa" { // 启动xa示例 } else if os.Args[1] == "xa" { // 启动xa示例
dtmsvr.PopulateMysql() dtmsvr.PopulateMysql()
dtmsvr.Main() dtmsvr.MainStart()
examples.PopulateMysql() examples.PopulateMysql()
examples.XaSetup(app) examples.XaSetup(app)
examples.BaseAppStart(app) examples.BaseAppStart(app)
examples.XaFireRequest() examples.XaFireRequest()
} else if os.Args[1] == "dtmsvr" { // 只启动dtmsvr } else if os.Args[1] == "dtmsvr" { // 只启动dtmsvr
go dtmsvr.StartSvr() go dtmsvr.MainStart()
} else if os.Args[1] == "all" { // 运行所有示例 } else if os.Args[1] == "all" { // 运行所有示例
dtmsvr.PopulateMysql() dtmsvr.PopulateMysql()
examples.PopulateMysql() examples.PopulateMysql()
dtmsvr.Main() dtmsvr.MainStart()
examples.SagaSetup(app) examples.SagaSetup(app)
examples.TccSetup(app) examples.TccSetup(app)
examples.XaSetup(app) examples.XaSetup(app)
@ -48,10 +50,13 @@ func main() {
examples.SagaFireRequest() examples.SagaFireRequest()
examples.TccFireRequest() examples.TccFireRequest()
examples.XaFireRequest() examples.XaFireRequest()
} else if os.Args[1] == "saga_barrier" {
dtmsvr.PopulateMysql()
dtmsvr.MainStart()
examples.PopulateMysql()
examples.SagaBarrierMainStart()
} else { } else {
logrus.Fatalf("unknown arg: %s", os.Args[1]) logrus.Fatalf("unknown arg: %s", os.Args[1])
} }
for { wait()
time.Sleep(1000 * time.Second)
}
} }

View File

@ -114,6 +114,7 @@ func SqlDB2DB(sdb *sql.DB) *DB {
Conn: sdb, Conn: sdb,
}), &gorm.Config{}) }), &gorm.Config{})
E2P(err) E2P(err)
db.Use(&tracePlugin{})
return &DB{DB: db} return &DB{DB: db}
} }

View File

@ -23,12 +23,16 @@ func (t *TransInfo) String() string {
} }
func TransInfoFromReq(c *gin.Context) *TransInfo { func TransInfoFromReq(c *gin.Context) *TransInfo {
return &TransInfo{ ti := &TransInfo{
TransType: c.Query("trans_type"), TransType: c.Query("trans_type"),
Gid: c.Query("gid"), Gid: c.Query("gid"),
BranchID: c.Query("branch_id"), BranchID: c.Query("branch_id"),
BranchType: c.Query("branch_type"), BranchType: c.Query("branch_type"),
} }
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" {
panic(fmt.Errorf("invlid trans info: %v", ti))
}
return ti
} }
type BarrierModel struct { type BarrierModel struct {

View File

@ -2,6 +2,7 @@ package dtmsvr
import ( import (
"fmt" "fmt"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
@ -10,8 +11,8 @@ import (
var dtmsvrPort = 8080 var dtmsvrPort = 8080
func Main() { func MainStart() {
go StartSvr() StartSvr()
go CronCommitted() go CronCommitted()
go CronPrepared() go CronPrepared()
} }
@ -23,7 +24,8 @@ func StartSvr() {
app := common.GetGinApp() app := common.GetGinApp()
AddRoute(app) AddRoute(app)
logrus.Printf("dtmsvr listen at: %d", dtmsvrPort) logrus.Printf("dtmsvr listen at: %d", dtmsvrPort)
app.Run(fmt.Sprintf(":%d", dtmsvrPort)) go app.Run(fmt.Sprintf(":%d", dtmsvrPort))
time.Sleep(100 * time.Millisecond)
} }
func PopulateMysql() { func PopulateMysql() {

View File

@ -117,6 +117,15 @@ func (trans *TransGlobal) Process(db *common.DB) {
trans.getProcessor().ProcessOnce(db, branches) trans.getProcessor().ProcessOnce(db, branches)
} }
func (trans *TransGlobal) getBranchParams(branch *TransBranch) common.MS {
return common.MS{
"gid": trans.Gid,
"trans_type": trans.TransType,
"branch_id": branch.BranchID,
"branch_type": branch.BranchType,
}
}
func (t *TransGlobal) setNextCron(expireIn int64) []string { func (t *TransGlobal) setNextCron(expireIn int64) []string {
t.NextCronInterval = expireIn t.NextCronInterval = expireIn
next := time.Now().Add(time.Duration(config.TransCronInterval) * time.Second) next := time.Now().Add(time.Duration(config.TransCronInterval) * time.Second)

View File

@ -33,7 +33,7 @@ func (t *TransMsgProcessor) GenBranches() []TransBranch {
} }
func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) { func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.Url)
e2p(err) e2p(err)
body := resp.String() body := resp.String()
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {

View File

@ -36,7 +36,7 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch {
} }
func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.Url)
e2p(err) e2p(err)
body := resp.String() body := resp.String()
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {

View File

@ -20,7 +20,7 @@ func (t *TransTccProcessor) GenBranches() []TransBranch {
} }
func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParams(t.getBranchParams(branch)).Post(branch.Url)
e2p(err) e2p(err)
body := resp.String() body := resp.String()
if strings.Contains(body, "SUCCESS") { if strings.Contains(body, "SUCCESS") {

View File

@ -17,17 +17,17 @@ const SagaBarrierBusiApi = "/api/busi_saga_barrier"
var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi) var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi)
func SagaBarrierMain() { func SagaBarrierMainStart() {
go SagaBarrierStartSvr() SagaBarrierStartSvr()
SagaBarrierFireRequest() SagaBarrierFireRequest()
time.Sleep(1000 * time.Second)
} }
func SagaBarrierStartSvr() { func SagaBarrierStartSvr() {
logrus.Printf("saga barrier examples starting") logrus.Printf("saga barrier examples starting")
app := common.GetGinApp() app := common.GetGinApp()
SagaBarrierAddRoute(app) SagaBarrierAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort)) go app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort))
time.Sleep(100 * time.Millisecond)
} }
func SagaBarrierFireRequest() { func SagaBarrierFireRequest() {
@ -55,7 +55,7 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
req := reqFrom(c) req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb) db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).
Update("balance", gorm.Expr("balance + ?", req.Amount)) Update("balance", gorm.Expr("balance + ?", req.Amount))
return "SUCCESS", dbr.Error return "SUCCESS", dbr.Error
}) })
@ -65,7 +65,7 @@ func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
req := reqFrom(c) req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb) db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).
Update("balance", gorm.Expr("balance - ?", req.Amount)) Update("balance", gorm.Expr("balance - ?", req.Amount))
return "SUCCESS", dbr.Error return "SUCCESS", dbr.Error
}) })
@ -75,7 +75,7 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
req := reqFrom(c) req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb) db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2).
Update("balance", gorm.Expr("balance - ?", req.Amount)) Update("balance", gorm.Expr("balance - ?", req.Amount))
return "SUCCESS", dbr.Error return "SUCCESS", dbr.Error
}) })
@ -85,7 +85,7 @@ func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
req := reqFrom(c) req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb) db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2).
Update("balance", gorm.Expr("balance + ?", req.Amount)) Update("balance", gorm.Expr("balance + ?", req.Amount))
return "SUCCESS", dbr.Error return "SUCCESS", dbr.Error
}) })

View File

@ -20,16 +20,16 @@ var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiApi)
// 被app/main.go调用启动服务并运行示例 // 被app/main.go调用启动服务并运行示例
func StartMain() { func StartMain() {
go qsStartSvr() qsStartSvr()
qsFireRequest() qsFireRequest()
time.Sleep(1000 * time.Second)
} }
func qsStartSvr() { func qsStartSvr() {
app := common.GetGinApp() app := common.GetGinApp()
qsAddRoute(app) qsAddRoute(app)
logrus.Printf("quick qs examples listening at %d", qsBusiPort) logrus.Printf("quick qs examples listening at %d", qsBusiPort)
app.Run(fmt.Sprintf(":%d", qsBusiPort)) go app.Run(fmt.Sprintf(":%d", qsBusiPort))
time.Sleep(100 * time.Millisecond)
} }
func qsFireRequest() { func qsFireRequest() {