reorg ok
This commit is contained in:
parent
104ff472dc
commit
3257e04322
@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
"github.com/bwmarrin/snowflake"
|
"github.com/bwmarrin/snowflake"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/go-resty/resty/v2"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,12 +20,12 @@ func OrString(ss ...string) string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
var gNode *snowflake.Node = nil
|
|
||||||
|
|
||||||
func GenGid() string {
|
func GenGid() string {
|
||||||
return gNode.Generate().Base58()
|
return gNode.Generate().Base58()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var gNode *snowflake.Node = nil
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
node, err := snowflake.NewNode(1)
|
node, err := snowflake.NewNode(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -73,20 +72,6 @@ func MustRemarshal(from interface{}, to interface{}) {
|
|||||||
PanicIfError(err)
|
PanicIfError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var RestyClient = resty.New()
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
|
|
||||||
logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
|
|
||||||
r := resp.Request
|
|
||||||
logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String())
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetGinApp() *gin.Engine {
|
func GetGinApp() *gin.Engine {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
app := gin.Default()
|
app := gin.Default()
|
||||||
|
|||||||
36
dtm/saga.go
36
dtm/saga.go
@ -1,10 +1,11 @@
|
|||||||
package dtm
|
package dtm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Saga struct {
|
type Saga struct {
|
||||||
@ -34,22 +35,22 @@ func SagaNew(server string, gid string, transQuery string) *Saga {
|
|||||||
}
|
}
|
||||||
func (s *Saga) Add(action string, compensate string, postData interface{}) error {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) error {
|
||||||
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
||||||
|
d, err := json.Marshal(postData)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
step := SagaStep{
|
step := SagaStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
PostData: common.MustMarshalString(postData),
|
PostData: string(d),
|
||||||
}
|
}
|
||||||
s.Steps = append(s.Steps, step)
|
s.Steps = append(s.Steps, step)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Saga) getBody() *SagaData {
|
|
||||||
return &s.SagaData
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Saga) Prepare() error {
|
func (s *Saga) Prepare() error {
|
||||||
logrus.Printf("preparing %s body: %v", s.Gid, s.getBody())
|
logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData)
|
||||||
resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server))
|
resp, err := RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -60,8 +61,8 @@ func (s *Saga) Prepare() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Saga) Commit() error {
|
func (s *Saga) Commit() error {
|
||||||
logrus.Printf("committing %s body: %v", s.Gid, s.getBody())
|
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
|
||||||
resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server))
|
resp, err := RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -70,3 +71,18 @@ func (s *Saga) Commit() error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 辅助工具与代码
|
||||||
|
var RestyClient = resty.New()
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error {
|
||||||
|
logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error {
|
||||||
|
r := resp.Request
|
||||||
|
logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String())
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -13,10 +13,6 @@ import (
|
|||||||
"github.com/yedf/dtm/examples"
|
"github.com/yedf/dtm/examples"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
dtmsvr.LoadConfig()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestViper(t *testing.T) {
|
func TestViper(t *testing.T) {
|
||||||
assert.Equal(t, "test_val", viper.GetString("test"))
|
assert.Equal(t, "test_val", viper.GetString("test"))
|
||||||
}
|
}
|
||||||
@ -128,7 +124,7 @@ func TestDtmSvr(t *testing.T) {
|
|||||||
|
|
||||||
func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga {
|
func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga {
|
||||||
logrus.Printf("beginning a saga test ---------------- %s", gid)
|
logrus.Printf("beginning a saga test ---------------- %s", gid)
|
||||||
saga := dtm.SagaNew(examples.TcServer, gid, examples.Busi+"/TransQuery")
|
saga := dtm.SagaNew(examples.DtmServer, gid, examples.Busi+"/TransQuery")
|
||||||
req := examples.TransReq{
|
req := examples.TransReq{
|
||||||
Amount: 30,
|
Amount: 30,
|
||||||
TransInFailed: inFailed,
|
TransInFailed: inFailed,
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtm"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CronPreparedOnce(expire time.Duration) {
|
func CronPreparedOnce(expire time.Duration) {
|
||||||
@ -21,7 +22,7 @@ func CronPreparedOnce(expire time.Duration) {
|
|||||||
writeTransLog(sm.Gid, "saga touch prepared", "", -1, "")
|
writeTransLog(sm.Gid, "saga touch prepared", "", -1, "")
|
||||||
dbr = db.Model(&sm).Update("id", sm.ID)
|
dbr = db.Model(&sm).Update("id", sm.ID)
|
||||||
common.PanicIfError(dbr.Error)
|
common.PanicIfError(dbr.Error)
|
||||||
resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery)
|
resp, err := dtm.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery)
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
body := resp.String()
|
body := resp.String()
|
||||||
if strings.Contains(body, "FAIL") {
|
if strings.Contains(body, "FAIL") {
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package dtmsvr
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
@ -13,9 +14,67 @@ import (
|
|||||||
|
|
||||||
type M = map[string]interface{}
|
type M = map[string]interface{}
|
||||||
|
|
||||||
|
type tracePlugin struct{}
|
||||||
|
|
||||||
|
func (op *tracePlugin) Name() string {
|
||||||
|
return "tracePlugin"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (op *tracePlugin) Initialize(db *gorm.DB) (err error) {
|
||||||
|
before := func(db *gorm.DB) {
|
||||||
|
db.InstanceSet("ivy.startTime", time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
after := func(db *gorm.DB) {
|
||||||
|
_ts, _ := db.InstanceGet("ivy.startTime")
|
||||||
|
sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...)
|
||||||
|
logrus.Printf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql)
|
||||||
|
if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) {
|
||||||
|
if db.Error != nil && db.Error != gorm.ErrRecordNotFound {
|
||||||
|
panic(db.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
beforeName := "cb_before"
|
||||||
|
afterName := "cb_after"
|
||||||
|
|
||||||
|
logrus.Printf("installing db plugin: %s", op.Name())
|
||||||
|
// 开始前
|
||||||
|
_ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before)
|
||||||
|
_ = db.Callback().Query().Before("gorm:query").Register(beforeName, before)
|
||||||
|
_ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before)
|
||||||
|
_ = db.Callback().Update().Before("gorm:setup_reflect_value").Register(beforeName, before)
|
||||||
|
_ = db.Callback().Row().Before("gorm:row").Register(beforeName, before)
|
||||||
|
_ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before)
|
||||||
|
|
||||||
|
// 结束后
|
||||||
|
_ = db.Callback().Create().After("gorm:after_create").Register(afterName, after)
|
||||||
|
_ = db.Callback().Query().After("gorm:after_query").Register(afterName, after)
|
||||||
|
_ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after)
|
||||||
|
_ = db.Callback().Update().After("gorm:after_update").Register(afterName, after)
|
||||||
|
_ = db.Callback().Row().After("gorm:row").Register(afterName, after)
|
||||||
|
_ = db.Callback().Raw().After("gorm:raw").Register(afterName, after)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var db *gorm.DB = nil
|
var db *gorm.DB = nil
|
||||||
|
|
||||||
func DbGet() *gorm.DB {
|
type MyDb struct {
|
||||||
|
*gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MyDb) Must() *MyDb {
|
||||||
|
db := m.InstanceSet("ivy.must", true)
|
||||||
|
return &MyDb{DB: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MyDb) NoMust() *MyDb {
|
||||||
|
db := m.InstanceSet("ivy.must", false)
|
||||||
|
return &MyDb{DB: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DbGet() *MyDb {
|
||||||
LoadConfig()
|
LoadConfig()
|
||||||
if db == nil {
|
if db == nil {
|
||||||
conf := viper.GetStringMapString("mysql")
|
conf := viper.GetStringMapString("mysql")
|
||||||
@ -25,9 +84,10 @@ func DbGet() *gorm.DB {
|
|||||||
SkipDefaultTransaction: true,
|
SkipDefaultTransaction: true,
|
||||||
})
|
})
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
db = db1.Debug()
|
db1.Use(&tracePlugin{})
|
||||||
|
db = db1
|
||||||
}
|
}
|
||||||
return db
|
return &MyDb{DB: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeTransLog(gid string, action string, status string, step int, detail string) {
|
func writeTransLog(gid string, action string, status string, step int, detail string) {
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtm"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/clause"
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
@ -144,7 +145,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if step.Type == "action" && step.Status == "pending" {
|
if step.Type == "action" && step.Status == "pending" {
|
||||||
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
resp, err := dtm.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -183,7 +184,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) {
|
|||||||
if step.Type != "compensate" || step.Status != "pending" {
|
if step.Type != "compensate" || step.Status != "pending" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
resp, err := dtm.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,10 @@ package examples
|
|||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
const TcServer = "http://localhost:8080/api/dtmsvr"
|
// 指定dtm服务地址
|
||||||
|
const DtmServer = "http://localhost:8080/api/dtmsvr"
|
||||||
|
|
||||||
|
// 事务参与制的服务地址
|
||||||
const BusiPort = 8081
|
const BusiPort = 8081
|
||||||
const BusiApi = "/api/busi"
|
const BusiApi = "/api/busi"
|
||||||
|
|
||||||
|
|||||||
@ -3,15 +3,24 @@ package examples
|
|||||||
import (
|
import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"github.com/yedf/dtm/dtm"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Main() {
|
func FireRequest() {
|
||||||
go StartSvr()
|
gid := common.GenGid()
|
||||||
trans(&TransReq{
|
logrus.Printf("busi transaction begin: %s", gid)
|
||||||
|
req := &TransReq{
|
||||||
Amount: 30,
|
Amount: 30,
|
||||||
TransInFailed: false,
|
TransInFailed: false,
|
||||||
TransOutFailed: true,
|
TransOutFailed: false,
|
||||||
})
|
}
|
||||||
|
saga := dtm.SagaNew(DtmServer, gid, Busi+"/TransQuery")
|
||||||
|
|
||||||
|
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", req)
|
||||||
|
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", req)
|
||||||
|
saga.Prepare()
|
||||||
|
logrus.Printf("busi trans commit")
|
||||||
|
saga.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartSvr() {
|
func StartSvr() {
|
||||||
@ -6,7 +6,6 @@ import (
|
|||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
"github.com/yedf/dtm/dtm"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type M = map[string]interface{}
|
type M = map[string]interface{}
|
||||||
@ -83,24 +82,3 @@ func TransQuery(c *gin.Context) {
|
|||||||
res := common.OrString(TransQueryResult, "SUCCESS")
|
res := common.OrString(TransQueryResult, "SUCCESS")
|
||||||
c.JSON(200, M{"result": res})
|
c.JSON(200, M{"result": res})
|
||||||
}
|
}
|
||||||
|
|
||||||
func trans(req *TransReq) {
|
|
||||||
// gid := common.GenGid()
|
|
||||||
gid := "4eHhkCxVsQ1"
|
|
||||||
logrus.Printf("busi transaction begin: %s", gid)
|
|
||||||
saga := dtm.SagaNew(TcServer, gid, Busi+"/TransQuery")
|
|
||||||
|
|
||||||
saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", M{
|
|
||||||
"amount": req.Amount,
|
|
||||||
"transInFailed": req.TransInFailed,
|
|
||||||
"transOutFailed": req.TransOutFailed,
|
|
||||||
})
|
|
||||||
saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", M{
|
|
||||||
"amount": req.Amount,
|
|
||||||
"transInFailed": req.TransInFailed,
|
|
||||||
"transOutFailed": req.TransOutFailed,
|
|
||||||
})
|
|
||||||
saga.Prepare()
|
|
||||||
logrus.Printf("busi trans commit")
|
|
||||||
saga.Commit()
|
|
||||||
}
|
|
||||||
38
main.go
38
main.go
@ -1,10 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/yedf/dtm/common"
|
|
||||||
"github.com/yedf/dtm/dtmsvr"
|
"github.com/yedf/dtm/dtmsvr"
|
||||||
"github.com/yedf/dtm/examples"
|
"github.com/yedf/dtm/examples"
|
||||||
)
|
)
|
||||||
@ -13,38 +11,8 @@ type M = map[string]interface{}
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
dtmsvr.LoadConfig()
|
dtmsvr.LoadConfig()
|
||||||
|
go dtmsvr.StartSvr()
|
||||||
s := common.MustMarshalString(M{
|
go examples.StartSvr()
|
||||||
"a": 1,
|
examples.FireRequest()
|
||||||
"b": "str",
|
|
||||||
})
|
|
||||||
var obj interface{}
|
|
||||||
json.Unmarshal([]byte(s), &obj)
|
|
||||||
db := dtmsvr.DbGet()
|
|
||||||
tx := db.Begin()
|
|
||||||
common.PanicIfError(tx.Error)
|
|
||||||
dbr := tx.Commit()
|
|
||||||
common.PanicIfError(dbr.Error)
|
|
||||||
|
|
||||||
tx = db.Begin()
|
|
||||||
common.PanicIfError(tx.Error)
|
|
||||||
dbr = tx.Commit()
|
|
||||||
common.PanicIfError(dbr.Error)
|
|
||||||
db.Exec("truncate test1.a_saga")
|
|
||||||
db.Exec("truncate test1.a_saga_step")
|
|
||||||
// logrus.SetFormatter(&logrus.JSONFormatter{})
|
|
||||||
// dtmsvr.LoadConfig()
|
|
||||||
// rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
|
|
||||||
// err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, M{
|
|
||||||
// "gid": common.GenGid(),
|
|
||||||
// })
|
|
||||||
// common.PanicIfError(err)
|
|
||||||
// queue := rb.QueueNew(dtmsvr.RabbitmqConstPrepared)
|
|
||||||
// queue.WaitAndHandle(func(data map[string]interface{}) {
|
|
||||||
// logrus.Printf("processed msg: %v in queue1", data)
|
|
||||||
// })
|
|
||||||
|
|
||||||
dtmsvr.Main()
|
|
||||||
examples.Main()
|
|
||||||
time.Sleep(1000 * 1000 * 1000 * 1000)
|
time.Sleep(1000 * 1000 * 1000 * 1000)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user