add query

This commit is contained in:
yedongfu 2021-06-02 17:22:11 +08:00
parent 4864f20058
commit 75ba3e30cc
10 changed files with 189 additions and 62 deletions

View File

@ -1,17 +1,34 @@
### 轻量级分布式事务管理服务 ### 轻量级分布式事务管理服务
跨语言--语言无关基于http协议
支持xa、tcc、saga
## 快速开始
场景描述:
假设您实现了一个转账功能,分为两个微服务:转入、转出
转出:服务地址为 http://example.com/api/busi_saga/transOut?gid=xxx POST 参数为 {"uid": 2, "amount":30}
转入:服务地址为 http://example.com/api/busi_saga/transIn?gid=xxx POST 参数为 {"uid": 1, "amount":30}
在saga模式下有对应的补偿微服务
转出:服务地址为 http://example.com/api/busi_saga/transOutCompensate?gid=xxx POST 参数为 {"uid": 2, "amount":30}
转入:服务地址为 http://example.com/api/busi_saga/transInCompensate?gid=xxx POST 参数为 {"uid": 1, "amount":30}
HTTP协议方式
curl -d '{"gid":"xxx","trans_type":"saga","steps":[{"action":"http://example.com/api/busi_saga/TransOut","compensate":"http://example.com/api/busi_saga/TransOutCompensate","data":"{\"amount\":30}"},{"action":"http://localhost:8081/api/busi_saga/TransIn","compensate":"http://localhost:8081/api/busi_saga/TransInCompensate","data":"{\"amount\":30}"}]}' 8.140.124.252/api/dtm/commit
此请求向dtm提交了一个saga事务dtm会按照saga模式请求transIn/transOut并且在出错情况下保证抵用相关的补偿api
go客户端方式
// 事务参与者的服务地址
const startBusiPort = 8084
const startBusiApi = "/api/busi_start"
## 配置rabbitmq和mysql var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi)
err := dtm.SagaNew(DtmServer, gid).Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", &gin.H{
"amount": 30,
"uid": 2,
}).Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", &gin.H{
"amount": 30,
"uid": 1
}).Commit()
本地启动方式
需要安装docker和docker-compose
curl localhost:8080/api/initMysql
go run examples/app/main saga
dtm依赖于rabbitmq和mysql请搭建好rabbitmq和mysql并修改dtm.yml 其他
## 启动tc
```go run dtm-svr/svr```
## 启动例子saga的tm+rm
```go run example/saga```
## 或者启动例子tcc的tm+rm
```go run example/tcc```

View File

@ -152,7 +152,7 @@ func init() {
// RestyClient.SetRetryCount(2) // RestyClient.SetRetryCount(2)
// RestyClient.SetRetryWaitTime(1 * time.Second) // RestyClient.SetRetryWaitTime(1 * time.Second)
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) logrus.Printf("requesting: %s %s %v %v", r.Method, r.URL, r.Body, r.QueryParam)
return nil return nil
}) })
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {

View File

@ -1,10 +1,12 @@
package dtmsvr package dtmsvr
import ( import (
"errors"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/yedf/dtm/common" "github.com/yedf/dtm/common"
"gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
) )
@ -13,6 +15,7 @@ func AddRoute(engine *gin.Engine) {
engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit)) engine.POST("/api/dtmsvr/commit", common.WrapHandler(Commit))
engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch)) engine.POST("/api/dtmsvr/branch", common.WrapHandler(Branch))
engine.POST("/api/dtmsvr/rollback", common.WrapHandler(Rollback)) engine.POST("/api/dtmsvr/rollback", common.WrapHandler(Rollback))
engine.GET("/api/dtmsvr/query", common.WrapHandler(Query))
} }
func Prepare(c *gin.Context) (interface{}, error) { func Prepare(c *gin.Context) (interface{}, error) {
@ -57,3 +60,19 @@ func Branch(c *gin.Context) (interface{}, error) {
e2p(err) e2p(err)
return M{"message": "SUCCESS"}, nil return M{"message": "SUCCESS"}, nil
} }
func Query(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
if gid == "" {
return nil, errors.New("no gid specified")
}
trans := TransGlobal{}
db := dbGet()
dbr := db.Must().Where("gid", gid).First(&trans)
if dbr.Error == gorm.ErrRecordNotFound {
return M{"transaction": nil, "branches": [0]int{}}, nil
}
branches := []TransBranch{}
db.Must().Where("gid", gid).Find(&branches)
return M{"transaction": trans, "branches": branches}, nil
}

View File

@ -38,6 +38,11 @@ func TestDtmSvr(t *testing.T) {
e2p(dbGet().Exec("truncate trans_log").Error) e2p(dbGet().Exec("truncate trans_log").Error)
examples.ResetXaData() examples.ResetXaData()
sagaNormal(t)
// 需要放到前面的用例之后,才有真实的数据
transQuery(t)
tccNormal(t) tccNormal(t)
tccRollback(t) tccRollback(t)
tccRollbackPending(t) tccRollbackPending(t)
@ -46,8 +51,8 @@ func TestDtmSvr(t *testing.T) {
sagaCommittedPending(t) sagaCommittedPending(t)
sagaPreparePending(t) sagaPreparePending(t)
sagaPrepareCancel(t) sagaPrepareCancel(t)
sagaNormal(t)
sagaRollback(t) sagaRollback(t)
} }
func TestCover(t *testing.T) { func TestCover(t *testing.T) {
@ -225,3 +230,24 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc {
tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req)
return tcc return tcc
} }
func transQuery(t *testing.T) {
resp, err := common.RestyClient.R().SetQueryParam("gid", "gid-noramlSaga").Get(examples.DtmServer + "/query")
e2p(err)
m := M{}
assert.Equal(t, resp.StatusCode(), 200)
common.MustUnmarshalString(resp.String(), &m)
assert.NotEqual(t, nil, m["transaction"])
assert.Equal(t, 4, len(m["branches"].([]interface{})))
resp, err = common.RestyClient.R().SetQueryParam("gid", "").Get(examples.DtmServer + "/query")
e2p(err)
assert.Equal(t, resp.StatusCode(), 500)
resp, err = common.RestyClient.R().SetQueryParam("gid", "1").Get(examples.DtmServer + "/query")
e2p(err)
assert.Equal(t, resp.StatusCode(), 200)
common.MustUnmarshalString(resp.String(), &m)
assert.Equal(t, nil, m["transaction"])
assert.Equal(t, 0, len(m["branches"].([]interface{})))
}

View File

@ -37,10 +37,9 @@ func sagaFireRequest() {
TransInResult: "SUCCESS", TransInResult: "SUCCESS",
TransOutResult: "SUCCESS", TransOutResult: "SUCCESS",
} }
saga := dtm.SagaNew(DtmServer, gid) saga := dtm.SagaNew(DtmServer, gid).
Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req).
saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
err := saga.Prepare(SagaBusi + "/TransQuery") err := saga.Prepare(SagaBusi + "/TransQuery")
e2p(err) e2p(err)
logrus.Printf("busi trans commit") logrus.Printf("busi trans commit")

View File

@ -37,10 +37,9 @@ func tccFireRequest() {
TransInResult: "SUCCESS", TransInResult: "SUCCESS",
TransOutResult: "SUCCESS", TransOutResult: "SUCCESS",
} }
tcc := dtm.TccNew(DtmServer, gid) tcc := dtm.TccNew(DtmServer, gid).
Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req).
tcc.Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req) Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req)
tcc.Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req)
err := tcc.Prepare(TccBusi + "/TransQuery") err := tcc.Prepare(TccBusi + "/TransQuery")
e2p(err) e2p(err)
logrus.Printf("busi trans commit") logrus.Printf("busi trans commit")

76
examples/quick_start.go Normal file
View File

@ -0,0 +1,76 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const startBusiPort = 8084
const startBusiApi = "/api/busi_start"
var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi)
func startMain() {
go startStartSvr()
startFireRequest()
time.Sleep(1000 * time.Second)
}
func startStartSvr() {
logrus.Printf("saga examples starting")
app := common.GetGinApp()
startAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBusiPort))
}
func startFireRequest() {
gid := common.GenGid()
logrus.Printf("busi transaction begin: %s", gid)
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer, gid).
Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req).
Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req)
logrus.Printf("busi trans commit")
err := saga.Commit()
e2p(err)
}
func startAddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(startTransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(startTransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(startTransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(startTransOutCompensate))
logrus.Printf("examples listening at %d", startBusiPort)
}
func startTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
logrus.Printf("%s TransIn: %v result: %s", gid, req, req.TransInResult)
return M{"result": req.TransInResult}, nil
}
func startTransInCompensate(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}
func startTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
logrus.Printf("%s TransOut: %v result: %s", gid, req, req.TransOutResult)
return M{"result": req.TransOutResult}, nil
}
func startTransOutCompensate(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}

33
saga.go
View File

@ -1,7 +1,6 @@
package dtm package dtm
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -34,18 +33,26 @@ func SagaNew(server string, gid string) *Saga {
Server: server, Server: server,
} }
} }
func (s *Saga) Add(action string, compensate string, postData interface{}) error { func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
d, err := json.Marshal(postData)
if err != nil {
return err
}
step := SagaStep{ step := SagaStep{
Action: action, Action: action,
Compensate: compensate, Compensate: compensate,
Data: string(d), Data: common.MustMarshalString(postData),
} }
s.Steps = append(s.Steps, step) s.Steps = append(s.Steps, step)
return s
}
func (s *Saga) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil return nil
} }
@ -61,15 +68,3 @@ func (s *Saga) Prepare(queryPrepared string) error {
} }
return nil return nil
} }
func (s *Saga) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil
}

33
tcc.go
View File

@ -1,7 +1,6 @@
package dtm package dtm
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -35,19 +34,27 @@ func TccNew(server string, gid string) *Tcc {
Server: server, Server: server,
} }
} }
func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) error { func (s *Tcc) Add(try string, confirm string, cancel string, data interface{}) *Tcc {
logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data) logrus.Printf("tcc %s Add %s %s %s %v", s.Gid, try, confirm, cancel, data)
d, err := json.Marshal(data)
if err != nil {
return err
}
step := TccStep{ step := TccStep{
Try: try, Try: try,
Confirm: confirm, Confirm: confirm,
Cancel: cancel, Cancel: cancel,
Data: string(d), Data: common.MustMarshalString(data),
} }
s.Steps = append(s.Steps, step) s.Steps = append(s.Steps, step)
return s
}
func (s *Tcc) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.TccData)
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil return nil
} }
@ -63,15 +70,3 @@ func (s *Tcc) Prepare(queryPrepared string) error {
} }
return nil return nil
} }
func (s *Tcc) Commit() error {
logrus.Printf("committing %s body: %v", s.Gid, &s.TccData)
resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil
}

1
xa.go
View File

@ -56,6 +56,7 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca
})) }))
return xa return xa
} }
func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) {
defer common.P2E(&rerr) defer common.P2E(&rerr)
branch := common.GenGid() branch := common.GenGid()