ready to add xa

This commit is contained in:
yedongfu 2021-05-24 14:42:17 +08:00
parent b401e0f8d5
commit 0c42bdeeb9
8 changed files with 179 additions and 150 deletions

View File

@ -16,7 +16,7 @@ func main() {
dtmsvr.LoadConfig() dtmsvr.LoadConfig()
if cmd == "" { // 所有服务都启动 if cmd == "" { // 所有服务都启动
go dtmsvr.StartSvr() go dtmsvr.StartSvr()
go examples.StartSvr() go examples.SagaStartSvr()
} else if cmd == "dtmsvr" { } else if cmd == "dtmsvr" {
go dtmsvr.StartSvr() go dtmsvr.StartSvr()
} }

View File

@ -1,6 +1,6 @@
CREATE DATABASE `dtm` /*!40100 DEFAULT CHARACTER SET utf8mb4 */; -- CREATE DATABASE `dtm` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;
use dtm; -- use dtm;
drop table IF EXISTS saga; drop table IF EXISTS saga;
CREATE TABLE `saga` ( CREATE TABLE `saga` (
@ -51,3 +51,16 @@ CREATE TABLE `trans_log` (
KEY `gid` (`gid`), KEY `gid` (`gid`),
KEY `create_time` (`create_time`) KEY `create_time` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=48 DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB AUTO_INCREMENT=48 DEFAULT CHARSET=utf8mb4;
drop table if EXISTS user_account;
CREATE TABLE `user_account` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `user_id` (`user_id`),
KEY `create_time` (`create_time`),
KEY `update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@ -31,7 +31,7 @@ func TestDtmSvr(t *testing.T) {
// 启动组件 // 启动组件
go StartSvr() go StartSvr()
go examples.StartSvr() go examples.SagaStartSvr()
time.Sleep(time.Duration(100 * 1000 * 1000)) time.Sleep(time.Duration(100 * 1000 * 1000))
preparePending(t) preparePending(t)
@ -94,7 +94,7 @@ func prepareCancel(t *testing.T) {
saga := genSaga("gid1-prepareCancel", false, true) saga := genSaga("gid1-prepareCancel", false, true)
saga.Prepare() saga.Prepare()
examples.TransQueryResult = "FAIL" examples.TransQueryResult = "FAIL"
Config.PreparedExpire = 0 Config.PreparedExpire = -10
CronPreparedOnce(-10 * time.Second) CronPreparedOnce(-10 * time.Second)
examples.TransQueryResult = "" examples.TransQueryResult = ""
Config.PreparedExpire = 60 Config.PreparedExpire = 60
@ -116,11 +116,11 @@ func preparePending(t *testing.T) {
func commitedPending(t *testing.T) { func commitedPending(t *testing.T) {
saga := genSaga("gid-commitedPending", false, false) saga := genSaga("gid-commitedPending", false, false)
saga.Prepare() saga.Prepare()
saga.Commit()
examples.TransOutResult = "PENDING" examples.TransOutResult = "PENDING"
saga.Commit()
WaitCommitedSaga(saga.Gid) WaitCommitedSaga(saga.Gid)
assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid))
examples.TransOutResult = "" examples.TransOutResult = ""
assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid))
CronCommitedOnce(-10 * time.Second) CronCommitedOnce(-10 * time.Second)
WaitCommitedSaga(saga.Gid) WaitCommitedSaga(saga.Gid)
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid)) assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
@ -129,13 +129,13 @@ func commitedPending(t *testing.T) {
func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid) logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtm.SagaNew(examples.DtmServer, gid, examples.Busi+"/TransQuery") saga := dtm.SagaNew(examples.DtmServer, gid, examples.SagaBusi+"/TransQuery")
req := examples.TransReq{ req := examples.TransReq{
Amount: 30, Amount: 30,
TransInFailed: inFailed, TransInResult: common.If(inFailed, "FAIL", "SUCCESS").(string),
TransOutFailed: outFailed, TransOutResult: common.If(outFailed, "FAIL", "SUCCESS").(string),
} }
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInCompensate", &req) saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req)
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutCompensate", &req) saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req)
return saga return saga
} }

View File

@ -1,89 +0,0 @@
package examples
import (
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
func AddRoute(app *gin.Engine) {
app.POST(BusiApi+"/TransIn", common.WrapHandler(TransIn))
app.POST(BusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate))
app.POST(BusiApi+"/TransOut", common.WrapHandler(TransOut))
app.POST(BusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate))
app.GET(BusiApi+"/TransQuery", common.WrapHandler(TransQuery))
logrus.Printf("examples istening at %d", BusiPort)
}
type M = map[string]interface{}
var TransInResult = ""
var TransOutResult = ""
var TransInCompensateResult = ""
var TransOutCompensateResult = ""
var TransQueryResult = ""
type TransReq struct {
Amount int `json:"amount"`
TransInFailed bool `json:"transInFailed"`
TransOutFailed bool `json:"transOutFailed"`
}
func TransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := TransReq{}
if err := c.BindJSON(&req); err != nil {
return nil, err
}
if req.TransInFailed {
logrus.Printf("%s TransIn %v failed", gid, req)
return M{"result": "FAIL"}, nil
}
res := common.OrString(TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := TransReq{}
if err := c.BindJSON(&req); err != nil {
return nil, err
}
res := common.OrString(TransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := TransReq{}
if err := c.BindJSON(&req); err != nil {
return nil, err
}
if req.TransOutFailed {
logrus.Printf("%s TransOut %v failed", gid, req)
return M{"result": "FAIL"}, nil
}
res := common.OrString(TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := TransReq{}
if err := c.BindJSON(&req); err != nil {
return nil, err
}
res := common.OrString(TransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(TransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

View File

@ -1,12 +1,4 @@
package examples package examples
import "fmt"
// 指定dtm服务地址 // 指定dtm服务地址
const DtmServer = "http://localhost:8080/api/dtmsvr" const DtmServer = "http://localhost:8080/api/dtmsvr"
// 事务参与制的服务地址
const BusiPort = 8081
const BusiApi = "/api/busi"
var Busi = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiApi)

View File

@ -1,41 +0,0 @@
package examples
import (
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
func Main() {
go StartSvr()
FireRequest()
time.Sleep(1000 * time.Second)
}
func FireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInFailed: false,
TransOutFailed: false,
}
saga := dtm.SagaNew(DtmServer, gid, Busi+"/TransQuery")
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", req)
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", req)
err := saga.Prepare()
common.PanicIfError(err)
logrus.Printf("busi trans commit")
err = saga.Commit()
common.PanicIfError(err)
}
func StartSvr() {
logrus.Printf("examples starting")
app := common.GetGinApp()
AddRoute(app)
app.Run(":8081")
}

120
examples/saga_main.go Normal file
View File

@ -0,0 +1,120 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const SagaBusiPort = 8081
const SagaBusiApi = "/api/busi_saga"
var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi)
func SagaMain() {
go SagaStartSvr()
sagaFireRequest()
time.Sleep(1000 * time.Second)
}
func SagaStartSvr() {
logrus.Printf("saga examples starting")
app := common.GetGinApp()
AddRoute(app)
app.Run(":8081")
}
func sagaFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer, gid, SagaBusi+"/TransQuery")
saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req)
err := saga.Prepare()
common.PanicIfError(err)
logrus.Printf("busi trans commit")
err = saga.Commit()
common.PanicIfError(err)
}
// api
func AddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(TransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(TransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(TransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(TransOutCompensate))
app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(TransQuery))
logrus.Printf("examples listening at %d", SagaBusiPort)
}
type M = map[string]interface{}
var TransInResult = ""
var TransOutResult = ""
var TransInCompensateResult = ""
var TransOutCompensateResult = ""
var TransQueryResult = ""
type TransReq struct {
Amount int `json:"amount"`
TransInResult string `json:"transInResult"`
TransOutResult string `json:"transOutResult"`
}
func transReqFromContext(c *gin.Context) *TransReq {
req := TransReq{}
err := c.BindJSON(&req)
common.PanicIfError(err)
return &req
}
func TransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func TransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(TransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

34
examples/xa_main.go Normal file
View File

@ -0,0 +1,34 @@
package examples
import (
"fmt"
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const XaBusiPort = 8082
const XaBusiApi = "/api/busi_xa"
var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi)
func XaMain() {
go XaStartSvr()
xaFireRequest()
time.Sleep(1000 * time.Second)
}
func XaStartSvr() {
logrus.Printf("xa examples starting")
app := common.GetGinApp()
AddRoute(app)
app.Run(":8081")
}
func xaFireRequest() {
}
// api