unit test seems ok
This commit is contained in:
parent
d20a0f91ec
commit
694c07fbef
@ -1,9 +1,13 @@
|
|||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/bwmarrin/snowflake"
|
"github.com/bwmarrin/snowflake"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@ -66,3 +70,26 @@ func init() {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetGinApp() *gin.Engine {
|
||||||
|
gin.SetMode(gin.ReleaseMode)
|
||||||
|
app := gin.Default()
|
||||||
|
app.Use(func(c *gin.Context) {
|
||||||
|
body := ""
|
||||||
|
if c.Request.Method == "POST" {
|
||||||
|
rb, err := c.GetRawData()
|
||||||
|
if err != nil {
|
||||||
|
logrus.Printf("GetRawData error: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
body = string(rb)
|
||||||
|
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
began := time.Now()
|
||||||
|
logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
|
||||||
|
c.Next()
|
||||||
|
logrus.Printf("used %d ms %s %s query: %s body: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
|
||||||
|
|
||||||
|
})
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|||||||
25
dtm/saga.go
25
dtm/saga.go
@ -3,42 +3,42 @@ package dtm
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Saga struct {
|
||||||
|
SagaData
|
||||||
|
Server string
|
||||||
|
}
|
||||||
|
|
||||||
type SagaData struct {
|
type SagaData struct {
|
||||||
Gid string `json:"gid"`
|
Gid string `json:"gid"`
|
||||||
Steps []SagaStep `json:"steps"`
|
Steps []SagaStep `json:"steps"`
|
||||||
TransQuery string `json:"trans_query"`
|
TransQuery string `json:"trans_query"`
|
||||||
}
|
}
|
||||||
type Saga struct {
|
|
||||||
SagaData
|
|
||||||
Server string
|
|
||||||
}
|
|
||||||
type SagaStep struct {
|
type SagaStep struct {
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
Compensate string `json:"compensate"`
|
Compensate string `json:"compensate"`
|
||||||
PostData gin.H `json:"post_data"`
|
PostData string `json:"post_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func SagaNew(server string, gid string) *Saga {
|
func SagaNew(server string, gid string, transQuery string) *Saga {
|
||||||
return &Saga{
|
return &Saga{
|
||||||
SagaData: SagaData{
|
SagaData: SagaData{
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
|
TransQuery: transQuery,
|
||||||
},
|
},
|
||||||
Server: server,
|
Server: server,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (s *Saga) Add(action string, compensate string, postData gin.H) error {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) error {
|
||||||
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
||||||
step := SagaStep{
|
step := SagaStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
PostData: postData,
|
PostData: common.MustMarshalString(postData),
|
||||||
}
|
}
|
||||||
step.PostData = postData
|
|
||||||
s.Steps = append(s.Steps, step)
|
s.Steps = append(s.Steps, step)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -47,8 +47,7 @@ func (s *Saga) getBody() *SagaData {
|
|||||||
return &s.SagaData
|
return &s.SagaData
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Saga) Prepare(url string) error {
|
func (s *Saga) Prepare() error {
|
||||||
s.TransQuery = url
|
|
||||||
logrus.Printf("preparing %s body: %v", s.Gid, s.getBody())
|
logrus.Printf("preparing %s body: %v", s.Gid, s.getBody())
|
||||||
resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server))
|
resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
72
dtm_test.go
72
dtm_test.go
@ -2,10 +2,15 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/go-playground/assert/v2"
|
"github.com/go-playground/assert/v2"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtm"
|
||||||
"github.com/yedf/dtm/dtmsvr"
|
"github.com/yedf/dtm/dtmsvr"
|
||||||
|
"github.com/yedf/dtm/examples"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -16,8 +21,73 @@ func TestViper(t *testing.T) {
|
|||||||
assert.Equal(t, "test_val", viper.GetString("test"))
|
assert.Equal(t, "test_val", viper.GetString("test"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TTestDtmSvr(t *testing.T) {
|
func TestDtmSvr(t *testing.T) {
|
||||||
|
// 清理数据
|
||||||
|
rabbit := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
|
||||||
|
queprepared := rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared)
|
||||||
|
for i := 0; i < queprepared.Queue.Messages; i++ {
|
||||||
|
queprepared.WaitAndHandleOne(func(data M) {
|
||||||
|
logrus.Printf("ignoring prepared queue data before test")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
quecommited := rabbit.QueueNew(dtmsvr.RabbitmqConstCommited)
|
||||||
|
for i := 0; i < quecommited.Queue.Messages; i++ {
|
||||||
|
quecommited.WaitAndHandleOne(func(data M) {
|
||||||
|
logrus.Printf("ignoring commited queue data before test")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
db := dtmsvr.DbGet()
|
||||||
|
common.PanicIfError(db.Exec("truncate test1.a_saga").Error)
|
||||||
|
common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error)
|
||||||
|
|
||||||
|
// 启动组件
|
||||||
|
go dtmsvr.StartSvr()
|
||||||
|
go examples.StartSvr()
|
||||||
|
time.Sleep(time.Duration(100 * 1000 * 1000))
|
||||||
|
|
||||||
|
// 开始第一个正常流程的测试
|
||||||
|
saga := genSaga("gid-1", false, false)
|
||||||
|
saga.Prepare()
|
||||||
|
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
||||||
|
sm := dtmsvr.SagaModel{}
|
||||||
|
db.Model(&sm).Where("gid=?", saga.Gid).First(&sm)
|
||||||
|
assert.Equal(t, "prepared", sm.Status)
|
||||||
|
saga.Commit()
|
||||||
|
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
||||||
|
db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm)
|
||||||
|
assert.Equal(t, "finished", sm.Status)
|
||||||
|
steps := []dtmsvr.SagaStepModel{}
|
||||||
|
db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps)
|
||||||
|
assert.Equal(t, true, steps[0].Status == "pending" && steps[2].Status == "pending" && steps[1].Status == "finished" && steps[3].Status == "finished")
|
||||||
|
|
||||||
|
saga = genSaga("gid-2", false, true)
|
||||||
|
saga.Commit()
|
||||||
|
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
||||||
|
saga.Prepare()
|
||||||
|
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
||||||
|
sm = dtmsvr.SagaModel{}
|
||||||
|
db.Model(&dtmsvr.SagaModel{}).Where("gid=?", saga.Gid).First(&sm)
|
||||||
|
assert.Equal(t, "rollbacked", sm.Status)
|
||||||
|
steps = []dtmsvr.SagaStepModel{}
|
||||||
|
db.Model(&dtmsvr.SagaStepModel{}).Where("gid=?", saga.Gid).Find(&steps)
|
||||||
|
assert.Equal(t, true, steps[0].Status == "rollbacked" && steps[2].Status == "rollbacked" && steps[1].Status == "finished" && steps[3].Status == "rollbacked")
|
||||||
|
|
||||||
|
// assert.Equal(t, 1, 0)
|
||||||
|
// 开始测试
|
||||||
|
|
||||||
// 发送Prepare请求后,验证数据库
|
// 发送Prepare请求后,验证数据库
|
||||||
// ConsumeHalfMsg 验证数据库
|
// ConsumeHalfMsg 验证数据库
|
||||||
// ConsumeMsg 验证数据库
|
// ConsumeMsg 验证数据库
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga {
|
||||||
|
saga := dtm.SagaNew(examples.TcServer, gid, examples.BusiApi+"/TransQuery")
|
||||||
|
req := examples.TransReq{
|
||||||
|
Amount: 30,
|
||||||
|
TransInFailed: inFailed,
|
||||||
|
TransOutFailed: outFailed,
|
||||||
|
}
|
||||||
|
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInCompensate", &req)
|
||||||
|
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutCompensate", &req)
|
||||||
|
return saga
|
||||||
|
}
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
@ -47,7 +46,7 @@ func (*SagaStepModel) TableName() string {
|
|||||||
return "test1.a_saga_step"
|
return "test1.a_saga_step"
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlePreparedMsg(data gin.H) {
|
func HandlePreparedMsg(data M) {
|
||||||
db := DbGet()
|
db := DbGet()
|
||||||
logrus.Printf("creating saga model in prepare")
|
logrus.Printf("creating saga model in prepare")
|
||||||
data["steps"] = common.MustMarshalString(data["steps"])
|
data["steps"] = common.MustMarshalString(data["steps"])
|
||||||
@ -60,7 +59,7 @@ func handlePreparedMsg(data gin.H) {
|
|||||||
}).Create(&m)
|
}).Create(&m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleCommitedMsg(data gin.H) {
|
func HandleCommitedMsg(data M) {
|
||||||
db := DbGet()
|
db := DbGet()
|
||||||
logrus.Printf("creating saga model in commited")
|
logrus.Printf("creating saga model in commited")
|
||||||
steps := data["steps"].([]interface{})
|
steps := data["steps"].([]interface{})
|
||||||
@ -83,7 +82,7 @@ func handleCommitedMsg(data gin.H) {
|
|||||||
nsteps = append(nsteps, SagaStepModel{
|
nsteps = append(nsteps, SagaStepModel{
|
||||||
Gid: m.Gid,
|
Gid: m.Gid,
|
||||||
Step: len(nsteps) + 1,
|
Step: len(nsteps) + 1,
|
||||||
Data: common.MustMarshalString(step["post_data"]),
|
Data: step["post_data"].(string),
|
||||||
Url: step["compensate"].(string),
|
Url: step["compensate"].(string),
|
||||||
Type: "compensate",
|
Type: "compensate",
|
||||||
Status: "pending",
|
Status: "pending",
|
||||||
@ -91,7 +90,7 @@ func handleCommitedMsg(data gin.H) {
|
|||||||
nsteps = append(nsteps, SagaStepModel{
|
nsteps = append(nsteps, SagaStepModel{
|
||||||
Gid: m.Gid,
|
Gid: m.Gid,
|
||||||
Step: len(nsteps) + 1,
|
Step: len(nsteps) + 1,
|
||||||
Data: common.MustMarshalString(step["post_data"]),
|
Data: step["post_data"].(string),
|
||||||
Url: step["action"].(string),
|
Url: step["action"].(string),
|
||||||
Type: "action",
|
Type: "action",
|
||||||
Status: "pending",
|
Status: "pending",
|
||||||
@ -169,8 +168,7 @@ func ProcessCommitedSaga(gid string) (rerr error) {
|
|||||||
checkAndCommit(dbr)
|
checkAndCommit(dbr)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
logrus.Errorf("unknown response: %s, will be retried", body)
|
return fmt.Errorf("unknown response: %s, will be retried", body)
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -219,7 +217,7 @@ func StartConsumePreparedMsg(consumers int) {
|
|||||||
for i := 0; i < consumers; i++ {
|
for i := 0; i < consumers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
que := r.QueueNew(RabbitmqConstPrepared)
|
que := r.QueueNew(RabbitmqConstPrepared)
|
||||||
que.WaitAndHandle(handlePreparedMsg)
|
que.WaitAndHandle(HandlePreparedMsg)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -230,7 +228,7 @@ func StartConsumeCommitedMsg(consumers int) {
|
|||||||
for i := 0; i < consumers; i++ {
|
for i := 0; i < consumers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
que := r.QueueNew(RabbitmqConstCommited)
|
que := r.QueueNew(RabbitmqConstCommited)
|
||||||
que.WaitAndHandle(handleCommitedMsg)
|
que.WaitAndHandle(HandleCommitedMsg)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/streadway/amqp"
|
"github.com/streadway/amqp"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
@ -107,6 +106,7 @@ func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{}
|
|||||||
|
|
||||||
type RabbitmqQueue struct {
|
type RabbitmqQueue struct {
|
||||||
Name string
|
Name string
|
||||||
|
Queue *amqp.Queue
|
||||||
Channel *amqp.Channel
|
Channel *amqp.Channel
|
||||||
Conn *amqp.Connection
|
Conn *amqp.Connection
|
||||||
Deliveries <-chan amqp.Delivery
|
Deliveries <-chan amqp.Delivery
|
||||||
@ -117,12 +117,12 @@ func (q *RabbitmqQueue) Close() {
|
|||||||
// q.Conn.Close()
|
// q.Conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *RabbitmqQueue) WaitAndHandle(handler func(data gin.H)) {
|
func (q *RabbitmqQueue) WaitAndHandle(handler func(data M)) {
|
||||||
for {
|
for {
|
||||||
q.WaitAndHandleOne(handler)
|
q.WaitAndHandleOne(handler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data gin.H)) {
|
func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data M)) {
|
||||||
logrus.Printf("%s reading message", q.Name)
|
logrus.Printf("%s reading message", q.Name)
|
||||||
msg := <-q.Deliveries
|
msg := <-q.Deliveries
|
||||||
data := map[string]interface{}{}
|
data := map[string]interface{}{}
|
||||||
@ -168,6 +168,7 @@ func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue {
|
|||||||
)
|
)
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
return &RabbitmqQueue{
|
return &RabbitmqQueue{
|
||||||
|
Queue: &queue,
|
||||||
Name: queueName,
|
Name: queueName,
|
||||||
Channel: channel,
|
Channel: channel,
|
||||||
Deliveries: deliveries,
|
Deliveries: deliveries,
|
||||||
|
|||||||
@ -3,7 +3,6 @@ package dtmsvr
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/magiconair/properties/assert"
|
"github.com/magiconair/properties/assert"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
@ -19,12 +18,12 @@ func TestRabbitConfig(t *testing.T) {
|
|||||||
|
|
||||||
func TestRabbitmq1Msg(t *testing.T) {
|
func TestRabbitmq1Msg(t *testing.T) {
|
||||||
rb := RabbitmqNew(&ServerConfig.Rabbitmq)
|
rb := RabbitmqNew(&ServerConfig.Rabbitmq)
|
||||||
err := rb.SendAndConfirm(RabbitmqConstPrepared, gin.H{
|
err := rb.SendAndConfirm(RabbitmqConstPrepared, M{
|
||||||
"gid": common.GenGid(),
|
"gid": common.GenGid(),
|
||||||
})
|
})
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
queue := rb.QueueNew(RabbitmqConstPrepared)
|
queue := rb.QueueNew(RabbitmqConstPrepared)
|
||||||
queue.WaitAndHandle(func(data gin.H) {
|
queue.WaitAndHandle(func(data M) {
|
||||||
logrus.Printf("processed msg: %v in queue1", data)
|
logrus.Printf("processed msg: %v in queue1", data)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 0, 1)
|
assert.Equal(t, 0, 1)
|
||||||
|
|||||||
@ -11,7 +11,7 @@ func AddRoute(engine *gin.Engine) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Prepare(c *gin.Context) {
|
func Prepare(c *gin.Context) {
|
||||||
data := gin.H{}
|
data := M{}
|
||||||
err := c.BindJSON(&data)
|
err := c.BindJSON(&data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -19,11 +19,11 @@ func Prepare(c *gin.Context) {
|
|||||||
rabbit := RabbitmqGet()
|
rabbit := RabbitmqGet()
|
||||||
err = rabbit.SendAndConfirm(RabbitmqConstPrepared, data)
|
err = rabbit.SendAndConfirm(RabbitmqConstPrepared, data)
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
c.JSON(200, gin.H{"message": "SUCCESS"})
|
c.JSON(200, M{"message": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Commit(c *gin.Context) {
|
func Commit(c *gin.Context) {
|
||||||
data := gin.H{}
|
data := M{}
|
||||||
err := c.BindJSON(&data)
|
err := c.BindJSON(&data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -31,5 +31,5 @@ func Commit(c *gin.Context) {
|
|||||||
rabbit := RabbitmqGet()
|
rabbit := RabbitmqGet()
|
||||||
err = rabbit.SendAndConfirm(RabbitmqConstCommited, data)
|
err = rabbit.SendAndConfirm(RabbitmqConstCommited, data)
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
c.JSON(200, gin.H{"message": "SUCCESS"})
|
c.JSON(200, M{"message": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,38 +1,21 @@
|
|||||||
package dtmsvr
|
package dtmsvr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"io/ioutil"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Main() {
|
func Main() {
|
||||||
logrus.Printf("start dtmsvr")
|
StartConsumePreparedMsg(1)
|
||||||
gin.SetMode(gin.ReleaseMode)
|
|
||||||
app := gin.Default()
|
|
||||||
app.Use(func(c *gin.Context) {
|
|
||||||
body := ""
|
|
||||||
if c.Request.Method == "POST" {
|
|
||||||
rb, err := c.GetRawData()
|
|
||||||
if err != nil {
|
|
||||||
logrus.Printf("GetRawData error: %s", err.Error())
|
|
||||||
} else {
|
|
||||||
body = string(rb)
|
|
||||||
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(rb))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
began := time.Now()
|
|
||||||
logrus.Printf("begin %s %s query: %s body: %s", c.Request.Method, c.FullPath(), c.Request.URL.RawQuery, body)
|
|
||||||
c.Next()
|
|
||||||
logrus.Printf("used %d ms %s %s query: %s", time.Since(began).Milliseconds(), c.Request.Method, c.FullPath(), c.Request.URL.RawQuery)
|
|
||||||
|
|
||||||
})
|
|
||||||
AddRoute(app)
|
|
||||||
// StartConsumePreparedMsg(1)
|
|
||||||
StartConsumeCommitedMsg(1)
|
StartConsumeCommitedMsg(1)
|
||||||
logrus.Printf("dtmsvr listen at: 8080")
|
logrus.Printf("dtmsvr listen at: 8080")
|
||||||
go app.Run()
|
go StartSvr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartSvr() {
|
||||||
|
logrus.Printf("start dtmsvr")
|
||||||
|
app := common.GetGinApp()
|
||||||
|
AddRoute(app)
|
||||||
|
logrus.Printf("dtmsvr listen at: 8080")
|
||||||
|
app.Run()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,25 +1,27 @@
|
|||||||
package examples
|
package examples
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Main() {
|
func Main() {
|
||||||
logrus.Printf("examples starting")
|
go StartSvr()
|
||||||
gin.SetMode(gin.ReleaseMode)
|
|
||||||
app := gin.Default()
|
|
||||||
app.POST(BusiApi+"/TransIn", TransIn)
|
|
||||||
app.POST(BusiApi+"/TransInCompensate", TransInCompensate)
|
|
||||||
app.POST(BusiApi+"/TransOut", TransOut)
|
|
||||||
app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate)
|
|
||||||
app.POST(BusiApi+"/TransQuery", TransQuery)
|
|
||||||
|
|
||||||
go app.Run(":8081")
|
|
||||||
logrus.Printf("examples istening at %d", BusiPort)
|
|
||||||
trans(&TransReq{
|
trans(&TransReq{
|
||||||
Amount: 30,
|
Amount: 30,
|
||||||
TransInFailed: false,
|
TransInFailed: false,
|
||||||
TransOutFailed: true,
|
TransOutFailed: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func StartSvr() {
|
||||||
|
logrus.Printf("examples starting")
|
||||||
|
app := common.GetGinApp()
|
||||||
|
app.POST(BusiApi+"/TransIn", TransIn)
|
||||||
|
app.POST(BusiApi+"/TransInCompensate", TransInCompensate)
|
||||||
|
app.POST(BusiApi+"/TransOut", TransOut)
|
||||||
|
app.POST(BusiApi+"/TransOutCompensate", TransOutCompensate)
|
||||||
|
app.POST(BusiApi+"/TransQuery", TransQuery)
|
||||||
|
logrus.Printf("examples istening at %d", BusiPort)
|
||||||
|
app.Run(":8081")
|
||||||
|
}
|
||||||
|
|||||||
@ -4,11 +4,11 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/gin-gonic/gin/binding"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/dtm"
|
"github.com/yedf/dtm/dtm"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type M = map[string]interface{}
|
||||||
type TransReq struct {
|
type TransReq struct {
|
||||||
Amount int `json:"amount"`
|
Amount int `json:"amount"`
|
||||||
TransInFailed bool `json:"transInFailed"`
|
TransInFailed bool `json:"transInFailed"`
|
||||||
@ -27,7 +27,7 @@ func TransIn(c *gin.Context) {
|
|||||||
c.Error(fmt.Errorf("TransIn failed for gid: %s", gid))
|
c.Error(fmt.Errorf("TransIn failed for gid: %s", gid))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(200, gin.H{"result": "SUCCESS"})
|
c.JSON(200, M{"result": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TransInCompensate(c *gin.Context) {
|
func TransInCompensate(c *gin.Context) {
|
||||||
@ -37,25 +37,22 @@ func TransInCompensate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
logrus.Printf("%s TransInCompensate: %v", gid, req)
|
logrus.Printf("%s TransInCompensate: %v", gid, req)
|
||||||
c.JSON(200, gin.H{"result": "SUCCESS"})
|
c.JSON(200, M{"result": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TransOut(c *gin.Context) {
|
func TransOut(c *gin.Context) {
|
||||||
gid := c.Query("gid")
|
gid := c.Query("gid")
|
||||||
req := TransReq{}
|
req := TransReq{}
|
||||||
req2 := TransReq{}
|
|
||||||
c.ShouldBindBodyWith(&req2, binding.JSON)
|
|
||||||
c.ShouldBindBodyWith(&req, binding.JSON)
|
|
||||||
if err := c.BindJSON(&req); err != nil {
|
if err := c.BindJSON(&req); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logrus.Printf("%s TransOut: %v", gid, req)
|
logrus.Printf("%s TransOut: %v", gid, req)
|
||||||
if req.TransOutFailed {
|
if req.TransOutFailed {
|
||||||
logrus.Printf("%s TransOut %v failed", gid, req)
|
logrus.Printf("%s TransOut %v failed", gid, req)
|
||||||
c.JSON(500, gin.H{"result": "FAIL"})
|
c.JSON(500, M{"result": "FAIL"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(200, gin.H{"result": "SUCCESS"})
|
c.JSON(200, M{"result": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TransOutCompensate(c *gin.Context) {
|
func TransOutCompensate(c *gin.Context) {
|
||||||
@ -65,7 +62,7 @@ func TransOutCompensate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
logrus.Printf("%s TransOutCompensate: %v", gid, req)
|
logrus.Printf("%s TransOutCompensate: %v", gid, req)
|
||||||
c.JSON(200, gin.H{"result": "SUCCESS"})
|
c.JSON(200, M{"result": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TransQuery(c *gin.Context) {
|
func TransQuery(c *gin.Context) {
|
||||||
@ -75,26 +72,26 @@ func TransQuery(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
logrus.Printf("%s TransQuery: %v", gid, req)
|
logrus.Printf("%s TransQuery: %v", gid, req)
|
||||||
c.JSON(200, gin.H{"result": "SUCCESS"})
|
c.JSON(200, M{"result": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func trans(req *TransReq) {
|
func trans(req *TransReq) {
|
||||||
// gid := common.GenGid()
|
// gid := common.GenGid()
|
||||||
gid := "4eHhkCxVsQ1"
|
gid := "4eHhkCxVsQ1"
|
||||||
logrus.Printf("busi transaction begin: %s", gid)
|
logrus.Printf("busi transaction begin: %s", gid)
|
||||||
saga := dtm.SagaNew(TcServer, gid)
|
saga := dtm.SagaNew(TcServer, gid, Busi+"/TransQuery")
|
||||||
|
|
||||||
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", gin.H{
|
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", M{
|
||||||
"amount": req.Amount,
|
"amount": req.Amount,
|
||||||
"transInFailed": req.TransInFailed,
|
"transInFailed": req.TransInFailed,
|
||||||
"transOutFailed": req.TransOutFailed,
|
"transOutFailed": req.TransOutFailed,
|
||||||
})
|
})
|
||||||
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", gin.H{
|
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", M{
|
||||||
"amount": req.Amount,
|
"amount": req.Amount,
|
||||||
"transInFailed": req.TransInFailed,
|
"transInFailed": req.TransInFailed,
|
||||||
"transOutFailed": req.TransOutFailed,
|
"transOutFailed": req.TransOutFailed,
|
||||||
})
|
})
|
||||||
saga.Prepare(Busi + "/TransQuery")
|
saga.Prepare()
|
||||||
logrus.Printf("busi trans commit")
|
logrus.Printf("busi trans commit")
|
||||||
saga.Commit()
|
saga.Commit()
|
||||||
}
|
}
|
||||||
|
|||||||
12
main.go
12
main.go
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
@ -8,8 +9,17 @@ import (
|
|||||||
"github.com/yedf/dtm/examples"
|
"github.com/yedf/dtm/examples"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type M = map[string]interface{}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
dtmsvr.LoadConfig()
|
dtmsvr.LoadConfig()
|
||||||
|
|
||||||
|
s := common.MustMarshalString(M{
|
||||||
|
"a": 1,
|
||||||
|
"b": "str",
|
||||||
|
})
|
||||||
|
var obj interface{}
|
||||||
|
json.Unmarshal([]byte(s), &obj)
|
||||||
db := dtmsvr.DbGet()
|
db := dtmsvr.DbGet()
|
||||||
tx := db.Begin()
|
tx := db.Begin()
|
||||||
common.PanicIfError(tx.Error)
|
common.PanicIfError(tx.Error)
|
||||||
@ -25,7 +35,7 @@ func main() {
|
|||||||
// logrus.SetFormatter(&logrus.JSONFormatter{})
|
// logrus.SetFormatter(&logrus.JSONFormatter{})
|
||||||
// dtmsvr.LoadConfig()
|
// dtmsvr.LoadConfig()
|
||||||
// rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
|
// rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
|
||||||
// err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, gin.H{
|
// err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, M{
|
||||||
// "gid": common.GenGid(),
|
// "gid": common.GenGid(),
|
||||||
// })
|
// })
|
||||||
// common.PanicIfError(err)
|
// common.PanicIfError(err)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user