some e2p renamed
This commit is contained in:
parent
bad6977de8
commit
693d6d56ea
@ -124,8 +124,8 @@ func (s *busiServer) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiReque
|
|||||||
req := TransReq{}
|
req := TransReq{}
|
||||||
dtmcli.MustUnmarshal(in.BusiData, &req)
|
dtmcli.MustUnmarshal(in.BusiData, &req)
|
||||||
tcc, err := dtmgrpc.TccFromRequest(in)
|
tcc, err := dtmgrpc.TccFromRequest(in)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
_, err = tcc.CallBranch(dtmcli.MustMarshal(req), BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert")
|
_, err = tcc.CallBranch(dtmcli.MustMarshal(req), BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert")
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName())
|
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,8 +9,6 @@ import (
|
|||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
)
|
)
|
||||||
|
|
||||||
var e2p = dtmcli.E2P
|
|
||||||
|
|
||||||
// M alias
|
// M alias
|
||||||
type M = map[string]interface{}
|
type M = map[string]interface{}
|
||||||
|
|
||||||
@ -45,7 +43,7 @@ func reqFrom(c *gin.Context) *TransReq {
|
|||||||
if !ok {
|
if !ok {
|
||||||
req := TransReq{}
|
req := TransReq{}
|
||||||
err := c.BindJSON(&req)
|
err := c.BindJSON(&req)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
c.Set("trans_req", &req)
|
c.Set("trans_req", &req)
|
||||||
v = &req
|
v = &req
|
||||||
}
|
}
|
||||||
@ -68,13 +66,13 @@ func dbGet() *common.DB {
|
|||||||
|
|
||||||
func sdbGet() *sql.DB {
|
func sdbGet() *sql.DB {
|
||||||
db, err := dtmcli.SdbGet(config.DB)
|
db, err := dtmcli.SdbGet(config.DB)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustGetTrans construct transaction info from request
|
// MustGetTrans construct transaction info from request
|
||||||
func MustGetTrans(c *gin.Context) *dtmcli.BranchBarrier {
|
func MustGetTrans(c *gin.Context) *dtmcli.BranchBarrier {
|
||||||
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
|
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return ti
|
return ti
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,10 +14,10 @@ var config = common.DtmConfig
|
|||||||
// RunSQLScript 1
|
// RunSQLScript 1
|
||||||
func RunSQLScript(conf map[string]string, script string, skipDrop bool) {
|
func RunSQLScript(conf map[string]string, script string, skipDrop bool) {
|
||||||
con, err := dtmcli.SdbAlone(conf)
|
con, err := dtmcli.SdbAlone(conf)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
defer func() { con.Close() }()
|
defer func() { con.Close() }()
|
||||||
content, err := ioutil.ReadFile(script)
|
content, err := ioutil.ReadFile(script)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
sqls := strings.Split(string(content), ";")
|
sqls := strings.Split(string(content), ";")
|
||||||
for _, sql := range sqls {
|
for _, sql := range sqls {
|
||||||
s := strings.TrimSpace(sql)
|
s := strings.TrimSpace(sql)
|
||||||
@ -25,7 +25,7 @@ func RunSQLScript(conf map[string]string, script string, skipDrop bool) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, err = dtmcli.SdbExec(con, s)
|
_, err = dtmcli.SdbExec(con, s)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -12,6 +12,6 @@ func MsgGrpcFireRequest() string {
|
|||||||
Add(BusiGrpc+"/examples.Busi/TransOut", req).
|
Add(BusiGrpc+"/examples.Busi/TransOut", req).
|
||||||
Add(BusiGrpc+"/examples.Busi/TransIn", req)
|
Add(BusiGrpc+"/examples.Busi/TransIn", req)
|
||||||
err := msg.Submit()
|
err := msg.Submit()
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return msg.Gid
|
return msg.Gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,6 +35,6 @@ func XaGrpcFireRequest() string {
|
|||||||
_, err = xa.CallBranch(busiData, BusiGrpc+"/examples.Busi/TransInXa")
|
_, err = xa.CallBranch(busiData, BusiGrpc+"/examples.Busi/TransInXa")
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,9 +12,9 @@ func MsgFireRequest() string {
|
|||||||
Add(Busi+"/TransOut", req).
|
Add(Busi+"/TransOut", req).
|
||||||
Add(Busi+"/TransIn", req)
|
Add(Busi+"/TransIn", req)
|
||||||
err := msg.Prepare(Busi + "/TransQuery")
|
err := msg.Prepare(Busi + "/TransQuery")
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
dtmcli.Logf("busi trans submit")
|
dtmcli.Logf("busi trans submit")
|
||||||
err = msg.Submit()
|
err = msg.Submit()
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return msg.Gid
|
return msg.Gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ func QsFireRequest() string {
|
|||||||
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
|
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
|
||||||
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
|
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
|
||||||
err := saga.Submit()
|
err := saga.Submit()
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return saga.Gid
|
return saga.Gid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,6 @@ func SagaFireRequest() string {
|
|||||||
dtmcli.Logf("saga busi trans submit")
|
dtmcli.Logf("saga busi trans submit")
|
||||||
err := saga.Submit()
|
err := saga.Submit()
|
||||||
dtmcli.Logf("result gid is: %s", saga.Gid)
|
dtmcli.Logf("result gid is: %s", saga.Gid)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return saga.Gid
|
return saga.Gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,7 @@ func SagaBarrierFireRequest() string {
|
|||||||
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
|
Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", req)
|
||||||
dtmcli.Logf("busi trans submit")
|
dtmcli.Logf("busi trans submit")
|
||||||
err := saga.Submit()
|
err := saga.Submit()
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return saga.Gid
|
return saga.Gid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,6 @@ func SagaWaitFireRequest() string {
|
|||||||
saga.WaitResult = true // 设置为等待结果模式,后面的submit调用,会等待服务器处理这个事务。如果Submit正常返回,那么整个全局事务已成功完成
|
saga.WaitResult = true // 设置为等待结果模式,后面的submit调用,会等待服务器处理这个事务。如果Submit正常返回,那么整个全局事务已成功完成
|
||||||
err := saga.Submit()
|
err := saga.Submit()
|
||||||
dtmcli.Logf("result gid is: %s", saga.Gid)
|
dtmcli.Logf("result gid is: %s", saga.Gid)
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return saga.Gid
|
return saga.Gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,7 +11,7 @@ func init() {
|
|||||||
setupFuncs["TccSetupSetup"] = func(app *gin.Engine) {
|
setupFuncs["TccSetupSetup"] = func(app *gin.Engine) {
|
||||||
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
app.POST(BusiAPI+"/TransInTccParent", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
||||||
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
|
tcc, err := dtmcli.TccFromQuery(c.Request.URL.Query())
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
dtmcli.Logf("TransInTccParent ")
|
dtmcli.Logf("TransInTccParent ")
|
||||||
return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(&TransReq{Amount: reqFrom(c).Amount}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
}))
|
}))
|
||||||
@ -28,7 +28,7 @@ func TccFireRequestNested() string {
|
|||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInTccParent", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,6 +43,6 @@ func TccFireRequest() string {
|
|||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert")
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,7 +21,7 @@ func TccBarrierFireRequest() string {
|
|||||||
}
|
}
|
||||||
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
return tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,21 +66,21 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
|
|||||||
return req.TransInResult, nil
|
return req.TransInResult, nil
|
||||||
}
|
}
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustTrading(sdb, transInUID, req.Amount)
|
return adjustTrading(sdb, transInUID, req.Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
|
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustBalance(sdb, transInUID, reqFrom(c).Amount)
|
return adjustBalance(sdb, transInUID, reqFrom(c).Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
|
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustTrading(sdb, transInUID, -reqFrom(c).Amount)
|
return adjustTrading(sdb, transInUID, -reqFrom(c).Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -91,14 +91,14 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) {
|
|||||||
return req.TransInResult, nil
|
return req.TransInResult, nil
|
||||||
}
|
}
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustTrading(sdb, transOutUID, -req.Amount)
|
return adjustTrading(sdb, transOutUID, -req.Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
|
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount)
|
return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
|
|||||||
// TccBarrierTransOutCancel will be use in test
|
// TccBarrierTransOutCancel will be use in test
|
||||||
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
|
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
|
||||||
barrier := MustGetTrans(c)
|
barrier := MustGetTrans(c)
|
||||||
return barrier.Call(dbGet().ToSQLDB(), func(sdb *sql.Tx) (interface{}, error) {
|
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
|
||||||
return adjustTrading(sdb, transOutUID, reqFrom(c).Amount)
|
return adjustTrading(sdb, transOutUID, reqFrom(c).Amount)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,7 +18,7 @@ func init() {
|
|||||||
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("branch_type"))
|
return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("branch_type"))
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,6 +32,6 @@ func XaFireRequest() string {
|
|||||||
}
|
}
|
||||||
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
|
return xa.CallBranch(&TransReq{Amount: 30}, Busi+"/TransInXa")
|
||||||
})
|
})
|
||||||
e2p(err)
|
dtmcli.FatalIfError(err)
|
||||||
return gid
|
return gid
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,4 +19,5 @@ func TestExamples(t *testing.T) {
|
|||||||
assertSucceed(t, examples.TccFireRequestNested())
|
assertSucceed(t, examples.TccFireRequestNested())
|
||||||
assertSucceed(t, examples.XaFireRequest())
|
assertSucceed(t, examples.XaFireRequest())
|
||||||
assertSucceed(t, examples.MsgGrpcFireRequest())
|
assertSucceed(t, examples.MsgGrpcFireRequest())
|
||||||
|
assertSucceed(t, examples.GrpcSagaFireRequest())
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user