添加可靠性消息支持

This commit is contained in:
yedongfu 2021-06-08 11:56:59 +08:00
parent f182bdb131
commit 28b19209f5
14 changed files with 302 additions and 25 deletions

View File

@ -2,9 +2,12 @@
DTM 是一款跨语言的分布式事务管理方案,在各类微服务架构中,提供高性能和简单易用的分布式事务服务。 DTM 是一款跨语言的分布式事务管理方案,在各类微服务架构中,提供高性能和简单易用的分布式事务服务。
# 特色 # 特色
### 跨语言 ### 跨语言
语言无关任何语言实现了http方式的服务都可以接入DTM用来管理分布式事务 语言无关任何语言实现了http方式的服务都可以接入DTM用来管理分布式事务。支持go、python、php、nodejs、ruby
### 多种分布式事务协议支持 ### 多种分布式事务协议支持
支持XATCCSAGA * TCC: Try-Confirm-Cancel
* SAGA:
* 可靠消息
* XA 需要底层数据库支持XA
### 高可用 ### 高可用
基于数据库实现,易集群化,已水平扩展 基于数据库实现,易集群化,已水平扩展
# 快速开始 # 快速开始

View File

@ -4,5 +4,12 @@ services:
image: 'mysql:5.7' image: 'mysql:5.7'
environment: environment:
MYSQL_ROOT_PASSWORD: my-secret-pw MYSQL_ROOT_PASSWORD: my-secret-pw
TZ: Asia/shanghai
command:
[
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
'--default-time-zone=+8:00',
]
ports: ports:
- '3306:3306' - '3306:3306'

View File

@ -1,8 +1,8 @@
package dtmsvr package dtmsvr
type dtmsvrConfig struct { type dtmsvrConfig struct {
PreparedExpire uint64 // 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel PreparedExpire int64 // 单位秒处于prepared中的任务过了这个时间查询结果还是PENDING的话则会被cancel
JobCronInterval uint64 // 单位秒 当事务等待这个时间之后还没有变化则进行一轮处理包括prepared中的任务和commited的任务 JobCronInterval int64 // 单位秒 当事务等待这个时间之后还没有变化则进行一轮处理包括prepared中的任务和commited的任务
Mysql map[string]string Mysql map[string]string
} }

View File

@ -32,6 +32,7 @@ func TestDtmSvr(t *testing.T) {
go examples.SagaStartSvr() go examples.SagaStartSvr()
go examples.XaStartSvr() go examples.XaStartSvr()
go examples.TccStartSvr() go examples.TccStartSvr()
go examples.MsgStartSvr()
time.Sleep(time.Duration(200 * 1000 * 1000)) time.Sleep(time.Duration(200 * 1000 * 1000))
// 清理数据 // 清理数据
@ -40,11 +41,9 @@ func TestDtmSvr(t *testing.T) {
e2p(dbGet().Exec("truncate trans_log").Error) e2p(dbGet().Exec("truncate trans_log").Error)
examples.ResetXaData() examples.ResetXaData()
msgPending(t)
msgNormal(t)
sagaNormal(t) sagaNormal(t)
// 需要放到前面的用例之后,才有真实的数据
transQuery(t)
tccNormal(t) tccNormal(t)
tccRollback(t) tccRollback(t)
tccRollbackPending(t) tccRollbackPending(t)
@ -66,9 +65,6 @@ func TestCover(t *testing.T) {
checkAffected(db.DB) checkAffected(db.DB)
} }
// 测试使用的全局对象
var initdb = dbGet()
func getTransStatus(gid string) string { func getTransStatus(gid string) string {
sm := TransGlobal{} sm := TransGlobal{}
dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm)
@ -159,6 +155,33 @@ func tccRollbackPending(t *testing.T) {
CronTransOnce(-10*time.Second, "committed") CronTransOnce(-10*time.Second, "committed")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
} }
func msgNormal(t *testing.T) {
msg := genMsg("gid-normal-msg")
msg.Commit()
assert.Equal(t, "committed", getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid)
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
func msgPending(t *testing.T) {
msg := genMsg("gid-normal-pending")
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = ""
examples.MsgTransInResult = "PENDING"
CronTransOnce(-10*time.Second, "prepared")
assert.Equal(t, "committed", getTransStatus(msg.Gid))
examples.MsgTransInResult = ""
CronTransOnce(-10*time.Second, "committed")
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
}
func sagaNormal(t *testing.T) { func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false) saga := genSaga("gid-noramlSaga", false, false)
saga.Prepare(saga.QueryPrepared) saga.Prepare(saga.QueryPrepared)
@ -167,6 +190,7 @@ func sagaNormal(t *testing.T) {
assert.Equal(t, "committed", getTransStatus(saga.Gid)) assert.Equal(t, "committed", getTransStatus(saga.Gid))
WaitTransProcessed(saga.Gid) WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
transQuery(t, saga.Gid)
} }
func sagaRollback(t *testing.T) { func sagaRollback(t *testing.T) {
@ -213,6 +237,16 @@ func sagaCommittedPending(t *testing.T) {
assert.Equal(t, "succeed", getTransStatus(saga.Gid)) assert.Equal(t, "succeed", getTransStatus(saga.Gid))
} }
func genMsg(gid string) *dtm.Msg {
logrus.Printf("beginning a msg test ---------------- %s", gid)
msg := dtm.MsgNew(examples.DtmServer, gid)
msg.QueryPrepared = examples.MsgBusi + "/TransQuery"
req := examples.GenTransReq(30, false, false)
msg.Add(examples.MsgBusi+"/TransOut", &req)
msg.Add(examples.MsgBusi+"/TransIn", &req)
return msg
}
func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid) logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtm.SagaNew(examples.DtmServer, gid) saga := dtm.SagaNew(examples.DtmServer, gid)
@ -224,7 +258,7 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga {
} }
func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc {
logrus.Printf("beginning a saga test ---------------- %s", gid) logrus.Printf("beginning a tcc test ---------------- %s", gid)
tcc := dtm.TccNew(examples.DtmServer, gid) tcc := dtm.TccNew(examples.DtmServer, gid)
tcc.QueryPrepared = examples.TccBusi + "/TransQuery" tcc.QueryPrepared = examples.TccBusi + "/TransQuery"
req := examples.GenTransReq(30, outFailed, inFailed) req := examples.GenTransReq(30, outFailed, inFailed)
@ -233,8 +267,8 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc {
return tcc return tcc
} }
func transQuery(t *testing.T) { func transQuery(t *testing.T, gid string) {
resp, err := common.RestyClient.R().SetQueryParam("gid", "gid-noramlSaga").Get(examples.DtmServer + "/query") resp, err := common.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query")
e2p(err) e2p(err)
m := M{} m := M{}
assert.Equal(t, resp.StatusCode(), 200) assert.Equal(t, resp.StatusCode(), 200)

View File

@ -28,6 +28,6 @@ func StartSvr() {
func PopulateMysql() { func PopulateMysql() {
common.InitApp(common.GetProjectDir(), &config) common.InitApp(common.GetProjectDir(), &config)
config.Mysql["database"] = dbName config.Mysql["database"] = ""
examples.RunSqlScript(config.Mysql, common.GetCurrentDir()+"/dtmsvr.sql") examples.RunSqlScript(config.Mysql, common.GetCurrentDir()+"/dtmsvr.sql")
} }

View File

@ -88,15 +88,16 @@ func checkAffected(db1 *gorm.DB) {
} }
} }
type processorCreator func(*TransGlobal) TransProcessor
var processorFac = map[string]processorCreator{}
func registorProcessorCreator(transType string, creator processorCreator) {
processorFac[transType] = creator
}
func (trans *TransGlobal) getProcessor() TransProcessor { func (trans *TransGlobal) getProcessor() TransProcessor {
if trans.TransType == "saga" { return processorFac[trans.TransType](trans)
return &TransSagaProcessor{TransGlobal: trans}
} else if trans.TransType == "tcc" {
return &TransTccProcessor{TransGlobal: trans}
} else if trans.TransType == "xa" {
return &TransXaProcessor{TransGlobal: trans}
}
return nil
} }
func (t *TransGlobal) MayQueryPrepared(db *common.MyDb) { func (t *TransGlobal) MayQueryPrepared(db *common.MyDb) {

68
dtmsvr/trans_msg.go Normal file
View File

@ -0,0 +1,68 @@
package dtmsvr
import (
"fmt"
"strings"
"github.com/yedf/dtm/common"
)
type TransMsgProcessor struct {
*TransGlobal
}
func init() {
registorProcessorCreator("msg", func(trans *TransGlobal) TransProcessor { return &TransMsgProcessor{TransGlobal: trans} })
}
func (t *TransMsgProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
steps := []M{}
common.MustUnmarshalString(t.Data, &steps)
for _, step := range steps {
branches = append(branches, TransBranch{
Gid: t.Gid,
Branch: fmt.Sprintf("%d", len(branches)+1),
Data: step["data"].(string),
Url: step["action"].(string),
BranchType: "action",
Status: "prepared",
})
}
return branches
}
func (t *TransMsgProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
t.touch(db)
if strings.Contains(body, "SUCCESS") {
branch.changeStatus(db, "succeed")
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))
}
}
func (t *TransMsgProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) {
t.MayQueryPrepared(db)
if t.Status != "committed" {
return
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
if branch.BranchType != "action" || branch.Status != "prepared" {
continue
}
t.ExecBranch(db, branch)
if branch.Status != "succeed" {
break
}
}
if current == len(branches) { // msg 事务完成
t.changeStatus(db, "succeed")
return
}
panic("msg go pass all branch")
}

View File

@ -11,6 +11,10 @@ type TransSagaProcessor struct {
*TransGlobal *TransGlobal
} }
func init() {
registorProcessorCreator("saga", func(trans *TransGlobal) TransProcessor { return &TransSagaProcessor{TransGlobal: trans} })
}
func (t *TransSagaProcessor) GenBranches() []TransBranch { func (t *TransSagaProcessor) GenBranches() []TransBranch {
branches := []TransBranch{} branches := []TransBranch{}
steps := []M{} steps := []M{}

View File

@ -11,6 +11,10 @@ type TransTccProcessor struct {
*TransGlobal *TransGlobal
} }
func init() {
registorProcessorCreator("tcc", func(trans *TransGlobal) TransProcessor { return &TransTccProcessor{TransGlobal: trans} })
}
func (t *TransTccProcessor) GenBranches() []TransBranch { func (t *TransTccProcessor) GenBranches() []TransBranch {
branches := []TransBranch{} branches := []TransBranch{}
steps := []M{} steps := []M{}

View File

@ -11,6 +11,10 @@ type TransXaProcessor struct {
*TransGlobal *TransGlobal
} }
func init() {
registorProcessorCreator("xa", func(trans *TransGlobal) TransProcessor { return &TransXaProcessor{TransGlobal: trans} })
}
func (t *TransXaProcessor) GenBranches() []TransBranch { func (t *TransXaProcessor) GenBranches() []TransBranch {
return []TransBranch{} return []TransBranch{}
} }

84
examples/main_msg.go Normal file
View File

@ -0,0 +1,84 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const MsgBusiPort = 8085
const MsgBusiApi = "/api/busi_msg"
var MsgBusi = fmt.Sprintf("http://localhost:%d%s", MsgBusiPort, MsgBusiApi)
func MsgMain() {
go MsgStartSvr()
MsgFireRequest()
time.Sleep(1000 * time.Second)
}
func MsgStartSvr() {
logrus.Printf("msg examples starting")
app := common.GetGinApp()
MsgAddRoute(app)
app.Run(fmt.Sprintf(":%d", MsgBusiPort))
}
func MsgFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
msg := dtm.MsgNew(DtmServer, gid).
Add(MsgBusi+"/TransOut", req).
Add(MsgBusi+"/TransIn", req)
err := msg.Prepare(MsgBusi + "/TransQuery")
e2p(err)
logrus.Printf("busi trans commit")
err = msg.Commit()
e2p(err)
}
// api
func MsgAddRoute(app *gin.Engine) {
app.POST(MsgBusiApi+"/TransIn", common.WrapHandler(msgTransIn))
app.POST(MsgBusiApi+"/TransOut", common.WrapHandler(MsgTransOut))
app.GET(MsgBusiApi+"/TransQuery", common.WrapHandler(msgTransQuery))
logrus.Printf("examples msg listening at %d", MsgBusiPort)
}
var MsgTransInResult = ""
var MsgTransOutResult = ""
var MsgTransQueryResult = ""
func msgTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(MsgTransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func MsgTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(MsgTransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func msgTransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(MsgTransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

68
message.go Normal file
View File

@ -0,0 +1,68 @@
package dtm
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
type Msg struct {
MsgData
Server string
}
type MsgData struct {
Gid string `json:"gid"`
TransType string `json:"trans_type"`
Steps []MsgStep `json:"steps"`
QueryPrepared string `json:"query_prepared"`
}
type MsgStep struct {
Action string `json:"action"`
Data string `json:"data"`
}
func MsgNew(server string, gid string) *Msg {
return &Msg{
MsgData: MsgData{
Gid: gid,
TransType: "msg",
},
Server: server,
}
}
func (s *Msg) Add(action string, postData interface{}) *Msg {
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData)
step := MsgStep{
Action: action,
Data: common.MustMarshalString(postData),
}
s.Steps = append(s.Steps, step)
return s
}
func (s *Msg) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil
}
func (s *Msg) Prepare(queryPrepared string) error {
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData)
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("prepare failed: %v", resp.Body())
}
return nil
}

View File

@ -57,7 +57,7 @@ func (s *Saga) Commit() error {
} }
func (s *Saga) Prepare(queryPrepared string) error { func (s *Saga) Prepare(queryPrepared string) error {
s.QueryPrepared = queryPrepared s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { if err != nil {

2
tcc.go
View File

@ -59,7 +59,7 @@ func (s *Tcc) Commit() error {
} }
func (s *Tcc) Prepare(queryPrepared string) error { func (s *Tcc) Prepare(queryPrepared string) error {
s.QueryPrepared = queryPrepared s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData)
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil { if err != nil {