From 28b19209f5488115c3149fe112a6491272448f60 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Tue, 8 Jun 2021 11:56:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8F=AF=E9=9D=A0=E6=80=A7?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 7 ++-- compose.mysql.yml | 7 ++++ dtmsvr/config.go | 4 +-- dtmsvr/dtmsvr_test.go | 54 ++++++++++++++++++++++------ dtmsvr/main.go | 2 +- dtmsvr/trans.go | 17 ++++----- dtmsvr/trans_msg.go | 68 +++++++++++++++++++++++++++++++++++ dtmsvr/trans_saga.go | 4 +++ dtmsvr/trans_tcc.go | 4 +++ dtmsvr/trans_xa.go | 4 +++ examples/main_msg.go | 84 +++++++++++++++++++++++++++++++++++++++++++ message.go | 68 +++++++++++++++++++++++++++++++++++ saga.go | 2 +- tcc.go | 2 +- 14 files changed, 302 insertions(+), 25 deletions(-) create mode 100644 dtmsvr/trans_msg.go create mode 100644 examples/main_msg.go create mode 100644 message.go diff --git a/README.md b/README.md index 95a2962..c0aa32b 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,12 @@ DTM 是一款跨语言的分布式事务管理方案,在各类微服务架构中,提供高性能和简单易用的分布式事务服务。 # 特色 ### 跨语言 -语言无关,任何语言实现了http方式的服务,都可以接入DTM,用来管理分布式事务 +语言无关,任何语言实现了http方式的服务,都可以接入DTM,用来管理分布式事务。支持go、python、php、nodejs、ruby ### 多种分布式事务协议支持 -支持XA,TCC,SAGA + * TCC: Try-Confirm-Cancel + * SAGA: + * 可靠消息 + * XA 需要底层数据库支持XA ### 高可用 基于数据库实现,易集群化,已水平扩展 # 快速开始 diff --git a/compose.mysql.yml b/compose.mysql.yml index bd99691..5249f3a 100644 --- a/compose.mysql.yml +++ b/compose.mysql.yml @@ -4,5 +4,12 @@ services: image: 'mysql:5.7' environment: MYSQL_ROOT_PASSWORD: my-secret-pw + TZ: Asia/shanghai + command: + [ + '--character-set-server=utf8mb4', + '--collation-server=utf8mb4_unicode_ci', + '--default-time-zone=+8:00', + ] ports: - '3306:3306' diff --git a/dtmsvr/config.go b/dtmsvr/config.go index 6c0fdcb..85df1c8 100644 --- a/dtmsvr/config.go +++ b/dtmsvr/config.go @@ -1,8 +1,8 @@ package dtmsvr type dtmsvrConfig struct { - PreparedExpire uint64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel - JobCronInterval uint64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 + PreparedExpire int64 // 单位秒,处于prepared中的任务,过了这个时间,查询结果还是PENDING的话,则会被cancel + JobCronInterval int64 // 单位秒 当事务等待这个时间之后,还没有变化,则进行一轮处理,包括prepared中的任务和commited的任务 Mysql map[string]string } diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index a9541d0..9837eef 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -32,6 +32,7 @@ func TestDtmSvr(t *testing.T) { go examples.SagaStartSvr() go examples.XaStartSvr() go examples.TccStartSvr() + go examples.MsgStartSvr() time.Sleep(time.Duration(200 * 1000 * 1000)) // 清理数据 @@ -40,11 +41,9 @@ func TestDtmSvr(t *testing.T) { e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() + msgPending(t) + msgNormal(t) sagaNormal(t) - - // 需要放到前面的用例之后,才有真实的数据 - transQuery(t) - tccNormal(t) tccRollback(t) tccRollbackPending(t) @@ -66,9 +65,6 @@ func TestCover(t *testing.T) { checkAffected(db.DB) } -// 测试使用的全局对象 -var initdb = dbGet() - func getTransStatus(gid string) string { sm := TransGlobal{} dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) @@ -159,6 +155,33 @@ func tccRollbackPending(t *testing.T) { CronTransOnce(-10*time.Second, "committed") assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid)) } + +func msgNormal(t *testing.T) { + msg := genMsg("gid-normal-msg") + msg.Commit() + assert.Equal(t, "committed", getTransStatus(msg.Gid)) + WaitTransProcessed(msg.Gid) + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + +func msgPending(t *testing.T) { + msg := genMsg("gid-normal-pending") + msg.Prepare("") + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MsgTransQueryResult = "PENDING" + CronTransOnce(-10*time.Second, "prepared") + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MsgTransQueryResult = "" + examples.MsgTransInResult = "PENDING" + CronTransOnce(-10*time.Second, "prepared") + assert.Equal(t, "committed", getTransStatus(msg.Gid)) + examples.MsgTransInResult = "" + CronTransOnce(-10*time.Second, "committed") + assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid)) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + func sagaNormal(t *testing.T) { saga := genSaga("gid-noramlSaga", false, false) saga.Prepare(saga.QueryPrepared) @@ -167,6 +190,7 @@ func sagaNormal(t *testing.T) { assert.Equal(t, "committed", getTransStatus(saga.Gid)) WaitTransProcessed(saga.Gid) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + transQuery(t, saga.Gid) } func sagaRollback(t *testing.T) { @@ -213,6 +237,16 @@ func sagaCommittedPending(t *testing.T) { assert.Equal(t, "succeed", getTransStatus(saga.Gid)) } +func genMsg(gid string) *dtm.Msg { + logrus.Printf("beginning a msg test ---------------- %s", gid) + msg := dtm.MsgNew(examples.DtmServer, gid) + msg.QueryPrepared = examples.MsgBusi + "/TransQuery" + req := examples.GenTransReq(30, false, false) + msg.Add(examples.MsgBusi+"/TransOut", &req) + msg.Add(examples.MsgBusi+"/TransIn", &req) + return msg +} + func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) saga := dtm.SagaNew(examples.DtmServer, gid) @@ -224,7 +258,7 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { } func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { - logrus.Printf("beginning a saga test ---------------- %s", gid) + logrus.Printf("beginning a tcc test ---------------- %s", gid) tcc := dtm.TccNew(examples.DtmServer, gid) tcc.QueryPrepared = examples.TccBusi + "/TransQuery" req := examples.GenTransReq(30, outFailed, inFailed) @@ -233,8 +267,8 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { return tcc } -func transQuery(t *testing.T) { - resp, err := common.RestyClient.R().SetQueryParam("gid", "gid-noramlSaga").Get(examples.DtmServer + "/query") +func transQuery(t *testing.T, gid string) { + resp, err := common.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query") e2p(err) m := M{} assert.Equal(t, resp.StatusCode(), 200) diff --git a/dtmsvr/main.go b/dtmsvr/main.go index feaac95..a07a172 100644 --- a/dtmsvr/main.go +++ b/dtmsvr/main.go @@ -28,6 +28,6 @@ func StartSvr() { func PopulateMysql() { common.InitApp(common.GetProjectDir(), &config) - config.Mysql["database"] = dbName + config.Mysql["database"] = "" examples.RunSqlScript(config.Mysql, common.GetCurrentDir()+"/dtmsvr.sql") } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index cf227cc..168294b 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -88,15 +88,16 @@ func checkAffected(db1 *gorm.DB) { } } +type processorCreator func(*TransGlobal) TransProcessor + +var processorFac = map[string]processorCreator{} + +func registorProcessorCreator(transType string, creator processorCreator) { + processorFac[transType] = creator +} + func (trans *TransGlobal) getProcessor() TransProcessor { - if trans.TransType == "saga" { - return &TransSagaProcessor{TransGlobal: trans} - } else if trans.TransType == "tcc" { - return &TransTccProcessor{TransGlobal: trans} - } else if trans.TransType == "xa" { - return &TransXaProcessor{TransGlobal: trans} - } - return nil + return processorFac[trans.TransType](trans) } func (t *TransGlobal) MayQueryPrepared(db *common.MyDb) { diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go new file mode 100644 index 0000000..68d84b2 --- /dev/null +++ b/dtmsvr/trans_msg.go @@ -0,0 +1,68 @@ +package dtmsvr + +import ( + "fmt" + "strings" + + "github.com/yedf/dtm/common" +) + +type TransMsgProcessor struct { + *TransGlobal +} + +func init() { + registorProcessorCreator("msg", func(trans *TransGlobal) TransProcessor { return &TransMsgProcessor{TransGlobal: trans} }) +} + +func (t *TransMsgProcessor) GenBranches() []TransBranch { + branches := []TransBranch{} + steps := []M{} + common.MustUnmarshalString(t.Data, &steps) + for _, step := range steps { + branches = append(branches, TransBranch{ + Gid: t.Gid, + Branch: fmt.Sprintf("%d", len(branches)+1), + Data: step["data"].(string), + Url: step["action"].(string), + BranchType: "action", + Status: "prepared", + }) + } + return branches +} + +func (t *TransMsgProcessor) ExecBranch(db *common.MyDb, branch *TransBranch) { + resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url) + e2p(err) + body := resp.String() + t.touch(db) + if strings.Contains(body, "SUCCESS") { + branch.changeStatus(db, "succeed") + } else { + panic(fmt.Errorf("unknown response: %s, will be retried", body)) + } +} + +func (t *TransMsgProcessor) ProcessOnce(db *common.MyDb, branches []TransBranch) { + t.MayQueryPrepared(db) + if t.Status != "committed" { + return + } + current := 0 // 当前正在处理的步骤 + for ; current < len(branches); current++ { + branch := &branches[current] + if branch.BranchType != "action" || branch.Status != "prepared" { + continue + } + t.ExecBranch(db, branch) + if branch.Status != "succeed" { + break + } + } + if current == len(branches) { // msg 事务完成 + t.changeStatus(db, "succeed") + return + } + panic("msg go pass all branch") +} diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index 0219eec..3a8f934 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -11,6 +11,10 @@ type TransSagaProcessor struct { *TransGlobal } +func init() { + registorProcessorCreator("saga", func(trans *TransGlobal) TransProcessor { return &TransSagaProcessor{TransGlobal: trans} }) +} + func (t *TransSagaProcessor) GenBranches() []TransBranch { branches := []TransBranch{} steps := []M{} diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index d6d6382..9a8d1bf 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -11,6 +11,10 @@ type TransTccProcessor struct { *TransGlobal } +func init() { + registorProcessorCreator("tcc", func(trans *TransGlobal) TransProcessor { return &TransTccProcessor{TransGlobal: trans} }) +} + func (t *TransTccProcessor) GenBranches() []TransBranch { branches := []TransBranch{} steps := []M{} diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index 9b6d635..1b71a6b 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -11,6 +11,10 @@ type TransXaProcessor struct { *TransGlobal } +func init() { + registorProcessorCreator("xa", func(trans *TransGlobal) TransProcessor { return &TransXaProcessor{TransGlobal: trans} }) +} + func (t *TransXaProcessor) GenBranches() []TransBranch { return []TransBranch{} } diff --git a/examples/main_msg.go b/examples/main_msg.go new file mode 100644 index 0000000..05d38d9 --- /dev/null +++ b/examples/main_msg.go @@ -0,0 +1,84 @@ +package examples + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm" + "github.com/yedf/dtm/common" +) + +// 事务参与者的服务地址 +const MsgBusiPort = 8085 +const MsgBusiApi = "/api/busi_msg" + +var MsgBusi = fmt.Sprintf("http://localhost:%d%s", MsgBusiPort, MsgBusiApi) + +func MsgMain() { + go MsgStartSvr() + MsgFireRequest() + time.Sleep(1000 * time.Second) +} + +func MsgStartSvr() { + logrus.Printf("msg examples starting") + app := common.GetGinApp() + MsgAddRoute(app) + app.Run(fmt.Sprintf(":%d", MsgBusiPort)) +} + +func MsgFireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ + Amount: 30, + TransInResult: "SUCCESS", + TransOutResult: "SUCCESS", + } + msg := dtm.MsgNew(DtmServer, gid). + Add(MsgBusi+"/TransOut", req). + Add(MsgBusi+"/TransIn", req) + err := msg.Prepare(MsgBusi + "/TransQuery") + e2p(err) + logrus.Printf("busi trans commit") + err = msg.Commit() + e2p(err) +} + +// api + +func MsgAddRoute(app *gin.Engine) { + app.POST(MsgBusiApi+"/TransIn", common.WrapHandler(msgTransIn)) + app.POST(MsgBusiApi+"/TransOut", common.WrapHandler(MsgTransOut)) + app.GET(MsgBusiApi+"/TransQuery", common.WrapHandler(msgTransQuery)) + logrus.Printf("examples msg listening at %d", MsgBusiPort) +} + +var MsgTransInResult = "" +var MsgTransOutResult = "" +var MsgTransQueryResult = "" + +func msgTransIn(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(MsgTransInResult, req.TransInResult, "SUCCESS") + logrus.Printf("%s TransIn: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func MsgTransOut(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + req := transReqFromContext(c) + res := common.OrString(MsgTransOutResult, req.TransOutResult, "SUCCESS") + logrus.Printf("%s TransOut: %v result: %s", gid, req, res) + return M{"result": res}, nil +} + +func msgTransQuery(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + logrus.Printf("%s TransQuery", gid) + res := common.OrString(MsgTransQueryResult, "SUCCESS") + return M{"result": res}, nil +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..4820409 --- /dev/null +++ b/message.go @@ -0,0 +1,68 @@ +package dtm + +import ( + "fmt" + + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +type Msg struct { + MsgData + Server string +} + +type MsgData struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []MsgStep `json:"steps"` + QueryPrepared string `json:"query_prepared"` +} +type MsgStep struct { + Action string `json:"action"` + Data string `json:"data"` +} + +func MsgNew(server string, gid string) *Msg { + return &Msg{ + MsgData: MsgData{ + Gid: gid, + TransType: "msg", + }, + Server: server, + } +} +func (s *Msg) Add(action string, postData interface{}) *Msg { + logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) + step := MsgStep{ + Action: action, + Data: common.MustMarshalString(postData), + } + s.Steps = append(s.Steps, step) + return s +} + +func (s *Msg) Commit() error { + logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) + resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/commit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("commit failed: %v", resp.Body()) + } + return nil +} + +func (s *Msg) Prepare(queryPrepared string) error { + s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) + logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) + resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/prepare", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("prepare failed: %v", resp.Body()) + } + return nil +} diff --git a/saga.go b/saga.go index 5f6d8a9..2a3d80e 100644 --- a/saga.go +++ b/saga.go @@ -57,7 +57,7 @@ func (s *Saga) Commit() error { } func (s *Saga) Prepare(queryPrepared string) error { - s.QueryPrepared = queryPrepared + s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil { diff --git a/tcc.go b/tcc.go index 11290ed..77f4b93 100644 --- a/tcc.go +++ b/tcc.go @@ -59,7 +59,7 @@ func (s *Tcc) Commit() error { } func (s *Tcc) Prepare(queryPrepared string) error { - s.QueryPrepared = queryPrepared + s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil {