examples setup refactored

This commit is contained in:
yedf2 2021-08-11 13:52:40 +08:00
parent b8127df92a
commit 4218256c19
8 changed files with 45 additions and 55 deletions

View File

@ -40,38 +40,26 @@ func main() {
} }
// 下面是各类的例子 // 下面是各类的例子
app := examples.BaseAppStartup() examples.BaseAppStartup()
examples.GrpcStartup() examples.GrpcStartup()
if os.Args[1] == "xa" { // 启动xa示例 if os.Args[1] == "xa" { // 启动xa示例
examples.XaSetup(app)
examples.XaFireRequest() examples.XaFireRequest()
} else if os.Args[1] == "saga" { // 启动saga示例 } else if os.Args[1] == "saga" { // 启动saga示例
examples.SagaSetup(app)
examples.SagaFireRequest() examples.SagaFireRequest()
} else if os.Args[1] == "tcc" { // 启动tcc示例 } else if os.Args[1] == "tcc" { // 启动tcc示例
examples.TccSetup(app)
examples.TccFireRequestNested() examples.TccFireRequestNested()
} else if os.Args[1] == "msg" { // 启动msg示例 } else if os.Args[1] == "msg" { // 启动msg示例
examples.MsgSetup(app)
examples.MsgFireRequest() examples.MsgFireRequest()
} else if os.Args[1] == "msg_grpc" { // 启动msg示例 } else if os.Args[1] == "msg_grpc" { // 启动msg示例
examples.MsgGrpcSetup(app)
examples.MsgGrpcFireRequest() examples.MsgGrpcFireRequest()
} else if os.Args[1] == "all" { // 运行所有示例 } else if os.Args[1] == "all" { // 运行所有示例
examples.SagaSetup(app)
examples.SagaWaitSetup(app)
examples.TccSetup(app)
examples.XaSetup(app)
examples.MsgSetup(app)
examples.SagaFireRequest() examples.SagaFireRequest()
examples.TccFireRequestNested() examples.TccFireRequestNested()
examples.XaFireRequest() examples.XaFireRequest()
examples.MsgFireRequest() examples.MsgFireRequest()
} else if os.Args[1] == "saga_barrier" { } else if os.Args[1] == "saga_barrier" {
examples.SagaBarrierAddRoute(app)
examples.SagaBarrierFireRequest() examples.SagaBarrierFireRequest()
} else if os.Args[1] == "tcc_barrier" { } else if os.Args[1] == "tcc_barrier" {
examples.TccBarrierAddRoute(app)
examples.TccBarrierFireRequest() examples.TccBarrierFireRequest()
} else { } else {
dtmcli.LogRedf("unknown arg: %s", os.Args[1]) dtmcli.LogRedf("unknown arg: %s", os.Args[1])

View File

@ -49,6 +49,7 @@ func (s *dtmServer) RegisterXaBranch(ctx context.Context, in *pb.DtmXaBranchRequ
BranchID: in.Info.BranchID, BranchID: in.Info.BranchID,
Status: "prepared", Status: "prepared",
Data: in.BusiData, Data: in.BusiData,
URL: in.Notify,
}) })
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
} }

View File

@ -46,13 +46,8 @@ func TestMain(m *testing.M) {
examples.PopulateDB(false) examples.PopulateDB(false)
// 启动组件 // 启动组件
go StartSvr() go StartSvr()
app = examples.BaseAppStartup()
examples.GrpcStartup() examples.GrpcStartup()
examples.XaGrpcSetup() app = examples.BaseAppStartup()
examples.TccSetup(app)
examples.XaSetup(app)
examples.TccBarrierAddRoute(app)
examples.SagaBarrierAddRoute(app)
resetXaData() resetXaData()
m.Run() m.Run()

View File

@ -3,6 +3,7 @@ package examples
import ( import (
context "context" context "context"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc" "github.com/yedf/dtm/dtmgrpc"
emptypb "google.golang.org/protobuf/types/known/emptypb" emptypb "google.golang.org/protobuf/types/known/emptypb"
@ -11,9 +12,10 @@ import (
// XaGrpcClient XA client connection // XaGrpcClient XA client connection
var XaGrpcClient *dtmgrpc.XaGrpcClient = nil var XaGrpcClient *dtmgrpc.XaGrpcClient = nil
// XaGrpcSetup 挂载http的api创建XaClient func init() {
func XaGrpcSetup() { setupFuncs["XaGrpcSetup"] = func(app *gin.Engine) {
XaGrpcClient = dtmgrpc.NewXaGrpcClient(DtmGrpcServer, config.DB, BusiGrpc+"/examples.Busi/XaNotify") XaGrpcClient = dtmgrpc.NewXaGrpcClient(DtmGrpcServer, config.DB, BusiGrpc+"/examples.Busi/XaNotify")
}
} }
func (s *busiServer) XaNotify(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { func (s *busiServer) XaNotify(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {

View File

@ -21,13 +21,14 @@ func SagaBarrierFireRequest() string {
return saga.Gid return saga.Gid
} }
// SagaBarrierAddRoute 1 func init() {
func SagaBarrierAddRoute(app *gin.Engine) { setupFuncs["SagaBarrierSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI+"/SagaBTransIn", common.WrapHandler(sagaBarrierTransIn)) app.POST(BusiAPI+"/SagaBTransIn", common.WrapHandler(sagaBarrierTransIn))
app.POST(BusiAPI+"/SagaBTransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate)) app.POST(BusiAPI+"/SagaBTransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate))
app.POST(BusiAPI+"/SagaBTransOut", common.WrapHandler(sagaBarrierTransOut)) app.POST(BusiAPI+"/SagaBTransOut", common.WrapHandler(sagaBarrierTransOut))
app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate)) app.POST(BusiAPI+"/SagaBTransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate))
dtmcli.Logf("examples listening at %d", BusiPort) dtmcli.Logf("examples listening at %d", BusiPort)
}
} }
func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { func sagaBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {

View File

@ -7,14 +7,15 @@ import (
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
) )
// TccSetup 1 func init() {
func TccSetup(app *gin.Engine) { setupFuncs["TccSetupSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) { app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query()) tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
e2p(err) e2p(err)
dtmcli.Logf("TransInTccParent ") dtmcli.Logf("TransInTccParent ")
return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
})) }))
}
} }
// TccFireRequestNested 1 // TccFireRequestNested 1

View File

@ -25,8 +25,8 @@ func TccBarrierFireRequest() string {
return gid return gid
} }
// TccBarrierAddRoute 1 func init() {
func TccBarrierAddRoute(app *gin.Engine) { setupFuncs["TccBarrierSetup"] = func(app *gin.Engine) {
app.POST(BusiAPI+"/TccBTransInTry", common.WrapHandler(tccBarrierTransInTry)) app.POST(BusiAPI+"/TccBTransInTry", common.WrapHandler(tccBarrierTransInTry))
app.POST(BusiAPI+"/TccBTransInConfirm", common.WrapHandler(tccBarrierTransInConfirm)) app.POST(BusiAPI+"/TccBTransInConfirm", common.WrapHandler(tccBarrierTransInConfirm))
app.POST(BusiAPI+"/TccBTransInCancel", common.WrapHandler(tccBarrierTransInCancel)) app.POST(BusiAPI+"/TccBTransInCancel", common.WrapHandler(tccBarrierTransInCancel))
@ -34,6 +34,7 @@ func TccBarrierAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/TccBTransOutConfirm", common.WrapHandler(tccBarrierTransOutConfirm)) app.POST(BusiAPI+"/TccBTransOutConfirm", common.WrapHandler(tccBarrierTransOutConfirm))
app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel)) app.POST(BusiAPI+"/TccBTransOutCancel", common.WrapHandler(TccBarrierTransOutCancel))
dtmcli.Logf("examples listening at %d", BusiPort) dtmcli.Logf("examples listening at %d", BusiPort)
}
} }
const transInUID = 1 const transInUID = 1

View File

@ -10,8 +10,8 @@ import (
// XaClient XA client connection // XaClient XA client connection
var XaClient *dtmcli.XaClient = nil var XaClient *dtmcli.XaClient = nil
// XaSetup 挂载http的api创建XaClient func init() {
func XaSetup(app *gin.Engine) { setupFuncs["XaSetup"] = func(app *gin.Engine) {
var err error var err error
XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) { XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) {
app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
@ -19,6 +19,7 @@ func XaSetup(app *gin.Engine) {
})) }))
}) })
e2p(err) e2p(err)
}
} }
// XaFireRequest 注册全局XA事务调用XA的分支 // XaFireRequest 注册全局XA事务调用XA的分支