From 7aabd05871dc77b1cc60020c48b4c12082a025b7 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Sat, 10 Jul 2021 22:54:02 +0800 Subject: [PATCH] refactor main --- app/main.go | 50 ++++++++++++--------------- dtmsvr/dtmsvr_test.go | 4 +-- examples/main_base.go | 10 +++--- examples/main_msg.go | 10 ------ examples/main_saga.go | 12 ------- examples/main_saga_barrier.go | 65 ++++++++++------------------------- examples/main_tcc.go | 12 ------- examples/main_tcc_barrier.go | 55 ++++++++--------------------- examples/main_xa.go | 10 ------ examples/quick_start.go | 2 +- 10 files changed, 61 insertions(+), 169 deletions(-) diff --git a/app/main.go b/app/main.go index 82a0eae..b626987 100644 --- a/app/main.go +++ b/app/main.go @@ -12,54 +12,48 @@ import ( type M = map[string]interface{} func wait() { - time.Sleep(10000 * time.Second) + for { + time.Sleep(10000 * time.Second) + } } func main() { - if len(os.Args) > 1 && (os.Args[1] == "quick_start" || os.Args[1] == "qs") { - dtmsvr.PopulateMysql() + if len(os.Args) == 1 || os.Args[1] == "dtmsvr" { // 只启动dtmsvr dtmsvr.MainStart() - examples.StartMain() wait() } - app := examples.BaseAppNew() - examples.BaseAppSetup(app) - if len(os.Args) == 1 || os.Args[1] == "dtmsvr" { // 只启动dtmsvr - go dtmsvr.MainStart() - } else if os.Args[1] == "xa" { // 启动xa示例 - dtmsvr.PopulateMysql() - dtmsvr.MainStart() - examples.PopulateMysql() + // 下面都是运行示例,因此首先把服务器的数据重新准备好 + dtmsvr.PopulateMysql() + dtmsvr.MainStart() + + // quick_start 比较独立,单独作为一个例子运行,方便新人上手 + if len(os.Args) > 1 && (os.Args[1] == "quick_start" || os.Args[1] == "qs") { + examples.QuickStarMain() + wait() + } + + // 下面是各类的例子 + examples.PopulateMysql() + app := examples.BaseAppStartup() + if os.Args[1] == "xa" { // 启动xa示例 examples.XaSetup(app) - examples.BaseAppStart(app) examples.XaFireRequest() } else if os.Args[1] == "saga" { // 启动saga示例 - dtmsvr.PopulateMysql() - dtmsvr.MainStart() examples.SagaSetup(app) - examples.BaseAppStart(app) examples.SagaFireRequest() } else if os.Args[1] == "all" { // 运行所有示例 - dtmsvr.PopulateMysql() - examples.PopulateMysql() - dtmsvr.MainStart() examples.SagaSetup(app) examples.TccSetup(app) examples.XaSetup(app) - examples.BaseAppStart(app) examples.SagaFireRequest() examples.TccFireRequest() examples.XaFireRequest() } else if os.Args[1] == "saga_barrier" { - dtmsvr.PopulateMysql() - dtmsvr.MainStart() - examples.PopulateMysql() - examples.SagaBarrierMainStart() + examples.SagaBarrierAddRoute(app) + examples.SagaBarrierFireRequest() } else if os.Args[1] == "tcc_barrier" { - dtmsvr.PopulateMysql() - dtmsvr.MainStart() - examples.PopulateMysql() - examples.TccBarrierMainStart() + examples.TccBarrierAddRoute(app) + examples.TccBarrierFireRequest() } else { logrus.Fatalf("unknown arg: %s", os.Args[1]) } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index bec6dd7..46a8839 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -33,13 +33,11 @@ func TestDtmSvr(t *testing.T) { TransProcessedTestChan = make(chan string, 1) // 启动组件 go StartSvr() - app := examples.BaseAppNew() - examples.BaseAppSetup(app) + app := examples.BaseAppStartup() examples.SagaSetup(app) examples.TccSetup(app) examples.XaSetup(app) examples.MsgSetup(app) - examples.BaseAppStart(app) // 清理数据 e2p(dbGet().Exec("truncate trans_global").Error) diff --git a/examples/main_base.go b/examples/main_base.go index 018be2c..0bc4e07 100644 --- a/examples/main_base.go +++ b/examples/main_base.go @@ -16,16 +16,14 @@ const ( var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiApi) -func BaseAppNew() *gin.Engine { +func BaseAppStartup() *gin.Engine { logrus.Printf("examples starting") app := common.GetGinApp() - return app -} - -func BaseAppStart(app *gin.Engine) { + BaseAddRoute(app) logrus.Printf("Starting busi at: %d", BusiPort) go app.Run(fmt.Sprintf(":%d", BusiPort)) time.Sleep(100 * time.Millisecond) + return app } type AutoEmptyString struct { @@ -62,7 +60,7 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi } -func BaseAppSetup(app *gin.Engine) { +func BaseAddRoute(app *gin.Engine) { app.POST(BusiApi+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) { return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn") })) diff --git a/examples/main_msg.go b/examples/main_msg.go index 6f9a080..86f1691 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -1,21 +1,11 @@ package examples import ( - "time" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/dtmcli" ) -func MsgMain() { - app := BaseAppNew() - BaseAppSetup(app) - BaseAppStart(app) - MsgFireRequest() - time.Sleep(1000 * time.Second) -} - func MsgSetup(app *gin.Engine) { } diff --git a/examples/main_saga.go b/examples/main_saga.go index efc1e57..9409750 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -1,23 +1,11 @@ package examples import ( - "time" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/dtmcli" ) -func SagaMain() { - app := BaseAppNew() - BaseAppSetup(app) - TccSetup(app) - go BaseAppStart(app) - time.Sleep(100 * time.Millisecond) - TccFireRequest() - time.Sleep(1000 * time.Second) -} - func SagaSetup(app *gin.Engine) { } diff --git a/examples/main_saga_barrier.go b/examples/main_saga_barrier.go index a1f6d2d..0356d0c 100644 --- a/examples/main_saga_barrier.go +++ b/examples/main_saga_barrier.go @@ -2,8 +2,6 @@ package examples import ( "database/sql" - "fmt" - "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -12,30 +10,12 @@ import ( "gorm.io/gorm" ) -// 事务参与者的服务地址 -const SagaBarrierBusiApi = "/api/busi_saga_barrier" - -var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi) - -func SagaBarrierMainStart() { - SagaBarrierStartSvr() - SagaBarrierFireRequest() -} - -func SagaBarrierStartSvr() { - logrus.Printf("saga barrier examples starting") - app := common.GetGinApp() - SagaBarrierAddRoute(app) - go app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort)) - time.Sleep(100 * time.Millisecond) -} - func SagaBarrierFireRequest() { logrus.Printf("a busi transaction begin") req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmServer). - Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req). - Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req) + Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", req). + Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req) logrus.Printf("busi trans submit") err := saga.Submit() e2p(err) @@ -44,49 +24,40 @@ func SagaBarrierFireRequest() { // api func SagaBarrierAddRoute(app *gin.Engine) { - app.POST(SagaBarrierBusiApi+"/TransIn", common.WrapHandler(sagaBarrierTransIn)) - app.POST(SagaBarrierBusiApi+"/TransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate)) - app.POST(SagaBarrierBusiApi+"/TransOut", common.WrapHandler(sagaBarrierTransOut)) - app.POST(SagaBarrierBusiApi+"/TransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) - logrus.Printf("examples listening at %d", SagaBarrierBusiPort) + app.POST(BusiApi+"/SagaBTransIn", common.WrapHandler(sagaBarrierTransIn)) + app.POST(BusiApi+"/SagaBTransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate)) + app.POST(BusiApi+"/SagaBTransOut", common.WrapHandler(sagaBarrierTransOut)) + app.POST(BusiApi+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) + logrus.Printf("examples listening at %d", BusiPort) +} + +func sagaBarrierAdjustBalance(sdb *sql.DB, uid int, amount int) (interface{}, error) { + db := common.SqlDB2DB(sdb) + dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).Update("balance", gorm.Expr("balance + ?", amount)) + return "SUCCESS", dbr.Error + } func sagaBarrierTransIn(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - db := common.SqlDB2DB(sdb) - dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1). - Update("balance", gorm.Expr("balance + ?", req.Amount)) - return "SUCCESS", dbr.Error + return sagaBarrierAdjustBalance(sdb, 1, reqFrom(c).Amount) }) } func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - db := common.SqlDB2DB(sdb) - dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1). - Update("balance", gorm.Expr("balance - ?", req.Amount)) - return "SUCCESS", dbr.Error + return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount) }) } func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - db := common.SqlDB2DB(sdb) - dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2). - Update("balance", gorm.Expr("balance - ?", req.Amount)) - return "SUCCESS", dbr.Error + return sagaBarrierAdjustBalance(sdb, 2, -reqFrom(c).Amount) }) } func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - db := common.SqlDB2DB(sdb) - dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2). - Update("balance", gorm.Expr("balance + ?", req.Amount)) - return "SUCCESS", dbr.Error + return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount) }) } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 711a716..dd5423a 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -1,24 +1,12 @@ package examples import ( - "time" - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" ) -func TccMain() { - app := BaseAppNew() - BaseAppSetup(app) - TccSetup(app) - go BaseAppStart(app) - time.Sleep(100 * time.Millisecond) - TccFireRequest() - time.Sleep(1000 * time.Second) -} - func TccSetup(app *gin.Engine) { app.POST(BusiApi+"/TransInTcc", common.WrapHandler(func(c *gin.Context) (interface{}, error) { tcc, err := dtmcli.TccFromReq(c) diff --git a/examples/main_tcc_barrier.go b/examples/main_tcc_barrier.go index 4d73bba..6d51212 100644 --- a/examples/main_tcc_barrier.go +++ b/examples/main_tcc_barrier.go @@ -3,7 +3,6 @@ package examples import ( "database/sql" "fmt" - "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -11,35 +10,17 @@ import ( "github.com/yedf/dtm/dtmcli" ) -// 事务参与者的服务地址 -const TccBarrierBusiApi = "/api/busi_saga_barrier" - -var TccBarrierBusi = fmt.Sprintf("http://localhost:%d%s", TccBarrierBusiPort, TccBarrierBusiApi) - -func TccBarrierMainStart() { - TccBarrierStartSvr() - TccBarrierFireRequest() -} - -func TccBarrierStartSvr() { - logrus.Printf("saga barrier examples starting") - app := common.GetGinApp() - TccBarrierAddRoute(app) - go app.Run(fmt.Sprintf(":%d", TccBarrierBusiPort)) - time.Sleep(100 * time.Millisecond) -} - func TccBarrierFireRequest() { logrus.Printf("tcc transaction begin") _, err := dtmcli.TccGlobalTransaction(DtmServer, func(tcc *dtmcli.Tcc) (rerr error) { - res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, TccBarrierBusi+"/TransOutTry", TccBarrierBusi+"/TransOutConfirm", TccBarrierBusi+"/TransOutRevert") + res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutRevert") if rerr != nil { return } if res1.StatusCode() != 200 { return fmt.Errorf("bad status code: %d", res1.StatusCode()) } - res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, TccBarrierBusi+"/TransInTry", TccBarrierBusi+"/TransInConfirm", TccBarrierBusi+"/TransInRevert") + res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInRevert") if rerr != nil { return } @@ -55,13 +36,13 @@ func TccBarrierFireRequest() { // api func TccBarrierAddRoute(app *gin.Engine) { - app.POST(TccBarrierBusiApi+"/TransInTry", common.WrapHandler(tccBarrierTransInTry)) - app.POST(TccBarrierBusiApi+"/TransInConfirm", common.WrapHandler(tccBarrierTransInConfirm)) - app.POST(TccBarrierBusiApi+"/TransInCancel", common.WrapHandler(tccBarrierTransInCancel)) - app.POST(TccBarrierBusiApi+"/TransOutTry", common.WrapHandler(tccBarrierTransOutTry)) - app.POST(TccBarrierBusiApi+"/TransOutConfirm", common.WrapHandler(tccBarrierTransOutConfirm)) - app.POST(TccBarrierBusiApi+"/TransOutCancel", common.WrapHandler(tccBarrierTransOutCancel)) - logrus.Printf("examples listening at %d", TccBarrierBusiPort) + app.POST(BusiApi+"/TccBTransInTry", common.WrapHandler(tccBarrierTransInTry)) + app.POST(BusiApi+"/TccBTransInConfirm", common.WrapHandler(tccBarrierTransInConfirm)) + app.POST(BusiApi+"/TccBTransInCancel", common.WrapHandler(tccBarrierTransInCancel)) + app.POST(BusiApi+"/TccBTransOutTry", common.WrapHandler(tccBarrierTransOutTry)) + app.POST(BusiApi+"/TccBTransOutConfirm", common.WrapHandler(tccBarrierTransOutConfirm)) + app.POST(BusiApi+"/TccBTransOutCancel", common.WrapHandler(tccBarrierTransOutCancel)) + logrus.Printf("examples listening at %d", BusiPort) } const transInUid = 1 @@ -93,43 +74,37 @@ func adjustBalance(sdb *sql.DB, uid int, amount int) (interface{}, error) { // TCC下,转入 func tccBarrierTransInTry(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustTrading(sdb, transInUid, req.Amount) + return adjustTrading(sdb, transInUid, reqFrom(c).Amount) }) } func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustBalance(sdb, transInUid, req.Amount) + return adjustBalance(sdb, transInUid, reqFrom(c).Amount) }) } func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustTrading(sdb, transInUid, -req.Amount) + return adjustTrading(sdb, transInUid, -reqFrom(c).Amount) }) } func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustTrading(sdb, transOutUid, -req.Amount) + return adjustTrading(sdb, transOutUid, -reqFrom(c).Amount) }) } func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustBalance(sdb, transOutUid, -req.Amount) + return adjustBalance(sdb, transOutUid, -reqFrom(c).Amount) }) } func tccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { - req := reqFrom(c) return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) { - return adjustTrading(sdb, transOutUid, req.Amount) + return adjustTrading(sdb, transOutUid, reqFrom(c).Amount) }) } diff --git a/examples/main_xa.go b/examples/main_xa.go index 48c7003..5e2c64f 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -2,7 +2,6 @@ package examples import ( "fmt" - "time" "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" @@ -32,15 +31,6 @@ func dbGet() *common.DB { return common.DbGet(Config.Mysql) } -func XaMain() { - app := BaseAppNew() - XaSetup(app) - go BaseAppStart(app) - time.Sleep(100 * time.Millisecond) - XaFireRequest() - time.Sleep(1000 * time.Second) -} - func XaFireRequest() { _, err := XaClient.XaGlobalTransaction(func(gid string) (rerr error) { defer common.P2E(&rerr) diff --git a/examples/quick_start.go b/examples/quick_start.go index 8155622..a7844af 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -19,7 +19,7 @@ const qsBusiPort = 8082 var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiApi) // 被app/main.go调用,启动服务并运行示例 -func StartMain() { +func QuickStarMain() { qsStartSvr() qsFireRequest() }