From 3257e04322efa3918d892ce80635b8d9d7aecbb4 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 20 May 2021 15:00:30 +0800 Subject: [PATCH] reorg ok --- common/utils.go | 19 +-------- dtm/saga.go | 36 ++++++++++++----- dtm_test.go | 6 +-- dtmsvr/cron.go | 3 +- dtmsvr/objects.go | 66 ++++++++++++++++++++++++++++++-- dtmsvr/service.go | 5 ++- examples/config.go | 5 ++- examples/{cli.go => main.go} | 19 ++++++--- examples/{saga.go => service.go} | 22 ----------- main.go | 38 ++---------------- 10 files changed, 118 insertions(+), 101 deletions(-) rename examples/{cli.go => main.go} (56%) rename examples/{saga.go => service.go} (76%) diff --git a/common/utils.go b/common/utils.go index 483496c..f6a6dd7 100644 --- a/common/utils.go +++ b/common/utils.go @@ -8,7 +8,6 @@ import ( "github.com/bwmarrin/snowflake" "github.com/gin-gonic/gin" - "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" ) @@ -21,12 +20,12 @@ func OrString(ss ...string) string { return "" } -var gNode *snowflake.Node = nil - func GenGid() string { return gNode.Generate().Base58() } +var gNode *snowflake.Node = nil + func init() { node, err := snowflake.NewNode(1) if err != nil { @@ -73,20 +72,6 @@ func MustRemarshal(from interface{}, to interface{}) { PanicIfError(err) } -var RestyClient = resty.New() - -func init() { - RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { - logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) - return nil - }) - RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { - r := resp.Request - logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String()) - return nil - }) -} - func GetGinApp() *gin.Engine { gin.SetMode(gin.ReleaseMode) app := gin.Default() diff --git a/dtm/saga.go b/dtm/saga.go index 4d4a1e1..fa5e379 100644 --- a/dtm/saga.go +++ b/dtm/saga.go @@ -1,10 +1,11 @@ package dtm import ( + "encoding/json" "fmt" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" - "github.com/yedf/dtm/common" ) type Saga struct { @@ -34,22 +35,22 @@ func SagaNew(server string, gid string, transQuery string) *Saga { } func (s *Saga) Add(action string, compensate string, postData interface{}) error { logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) + d, err := json.Marshal(postData) + if err != nil { + return err + } step := SagaStep{ Action: action, Compensate: compensate, - PostData: common.MustMarshalString(postData), + PostData: string(d), } s.Steps = append(s.Steps, step) return nil } -func (s *Saga) getBody() *SagaData { - return &s.SagaData -} - func (s *Saga) Prepare() error { - logrus.Printf("preparing %s body: %v", s.Gid, s.getBody()) - resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/prepare", s.Server)) + logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) + resp, err := RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) if err != nil { return err } @@ -60,8 +61,8 @@ func (s *Saga) Prepare() error { } func (s *Saga) Commit() error { - logrus.Printf("committing %s body: %v", s.Gid, s.getBody()) - resp, err := common.RestyClient.R().SetBody(s.getBody()).Post(fmt.Sprintf("%s/commit", s.Server)) + logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) + resp, err := RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/commit", s.Server)) if err != nil { return err } @@ -70,3 +71,18 @@ func (s *Saga) Commit() error { } return nil } + +// 辅助工具与代码 +var RestyClient = resty.New() + +func init() { + RestyClient.OnBeforeRequest(func(c *resty.Client, r *resty.Request) error { + logrus.Printf("requesting: %s %s %v", r.Method, r.URL, r.Body) + return nil + }) + RestyClient.OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { + r := resp.Request + logrus.Printf("requested: %s %s %s", r.Method, r.URL, resp.String()) + return nil + }) +} diff --git a/dtm_test.go b/dtm_test.go index 2668de4..259b6bc 100644 --- a/dtm_test.go +++ b/dtm_test.go @@ -13,10 +13,6 @@ import ( "github.com/yedf/dtm/examples" ) -func init() { - dtmsvr.LoadConfig() -} - func TestViper(t *testing.T) { assert.Equal(t, "test_val", viper.GetString("test")) } @@ -128,7 +124,7 @@ func TestDtmSvr(t *testing.T) { func genSaga(gid string, inFailed bool, outFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) - saga := dtm.SagaNew(examples.TcServer, gid, examples.Busi+"/TransQuery") + saga := dtm.SagaNew(examples.DtmServer, gid, examples.Busi+"/TransQuery") req := examples.TransReq{ Amount: 30, TransInFailed: inFailed, diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 03cc564..fb41bcc 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -6,6 +6,7 @@ import ( "time" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtm" ) func CronPreparedOnce(expire time.Duration) { @@ -21,7 +22,7 @@ func CronPreparedOnce(expire time.Duration) { writeTransLog(sm.Gid, "saga touch prepared", "", -1, "") dbr = db.Model(&sm).Update("id", sm.ID) common.PanicIfError(dbr.Error) - resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery) + resp, err := dtm.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.TransQuery) common.PanicIfError(err) body := resp.String() if strings.Contains(body, "FAIL") { diff --git a/dtmsvr/objects.go b/dtmsvr/objects.go index 29d3b0d..72b7cd8 100644 --- a/dtmsvr/objects.go +++ b/dtmsvr/objects.go @@ -3,6 +3,7 @@ package dtmsvr import ( "fmt" "strings" + "time" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -13,9 +14,67 @@ import ( type M = map[string]interface{} +type tracePlugin struct{} + +func (op *tracePlugin) Name() string { + return "tracePlugin" +} + +func (op *tracePlugin) Initialize(db *gorm.DB) (err error) { + before := func(db *gorm.DB) { + db.InstanceSet("ivy.startTime", time.Now()) + } + + after := func(db *gorm.DB) { + _ts, _ := db.InstanceGet("ivy.startTime") + sql := db.Dialector.Explain(db.Statement.SQL.String(), db.Statement.Vars...) + logrus.Printf("used: %d ms affected: %d sql is: %s", time.Since(_ts.(time.Time)).Milliseconds(), db.RowsAffected, sql) + if v, ok := db.InstanceGet("ivy.must"); ok && v.(bool) { + if db.Error != nil && db.Error != gorm.ErrRecordNotFound { + panic(db.Error) + } + } + } + + beforeName := "cb_before" + afterName := "cb_after" + + logrus.Printf("installing db plugin: %s", op.Name()) + // 开始前 + _ = db.Callback().Create().Before("gorm:before_create").Register(beforeName, before) + _ = db.Callback().Query().Before("gorm:query").Register(beforeName, before) + _ = db.Callback().Delete().Before("gorm:before_delete").Register(beforeName, before) + _ = db.Callback().Update().Before("gorm:setup_reflect_value").Register(beforeName, before) + _ = db.Callback().Row().Before("gorm:row").Register(beforeName, before) + _ = db.Callback().Raw().Before("gorm:raw").Register(beforeName, before) + + // 结束后 + _ = db.Callback().Create().After("gorm:after_create").Register(afterName, after) + _ = db.Callback().Query().After("gorm:after_query").Register(afterName, after) + _ = db.Callback().Delete().After("gorm:after_delete").Register(afterName, after) + _ = db.Callback().Update().After("gorm:after_update").Register(afterName, after) + _ = db.Callback().Row().After("gorm:row").Register(afterName, after) + _ = db.Callback().Raw().After("gorm:raw").Register(afterName, after) + return +} + var db *gorm.DB = nil -func DbGet() *gorm.DB { +type MyDb struct { + *gorm.DB +} + +func (m *MyDb) Must() *MyDb { + db := m.InstanceSet("ivy.must", true) + return &MyDb{DB: db} +} + +func (m *MyDb) NoMust() *MyDb { + db := m.InstanceSet("ivy.must", false) + return &MyDb{DB: db} +} + +func DbGet() *MyDb { LoadConfig() if db == nil { conf := viper.GetStringMapString("mysql") @@ -25,9 +84,10 @@ func DbGet() *gorm.DB { SkipDefaultTransaction: true, }) common.PanicIfError(err) - db = db1.Debug() + db1.Use(&tracePlugin{}) + db = db1 } - return db + return &MyDb{DB: db} } func writeTransLog(gid string, action string, status string, step int, detail string) { diff --git a/dtmsvr/service.go b/dtmsvr/service.go index c5ca731..425d5a4 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -9,6 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtm" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -144,7 +145,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) { continue } if step.Type == "action" && step.Status == "pending" { - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + resp, err := dtm.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) if err != nil { return err } @@ -183,7 +184,7 @@ func innerProcessCommitedSaga(gid string) (rerr error) { if step.Type != "compensate" || step.Status != "pending" { continue } - resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) + resp, err := dtm.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url) if err != nil { return err } diff --git a/examples/config.go b/examples/config.go index 8caa1f9..a676691 100644 --- a/examples/config.go +++ b/examples/config.go @@ -2,7 +2,10 @@ package examples import "fmt" -const TcServer = "http://localhost:8080/api/dtmsvr" +// 指定dtm服务地址 +const DtmServer = "http://localhost:8080/api/dtmsvr" + +// 事务参与制的服务地址 const BusiPort = 8081 const BusiApi = "/api/busi" diff --git a/examples/cli.go b/examples/main.go similarity index 56% rename from examples/cli.go rename to examples/main.go index 4d4b3f7..6c4dac8 100644 --- a/examples/cli.go +++ b/examples/main.go @@ -3,15 +3,24 @@ package examples import ( "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtm" ) -func Main() { - go StartSvr() - trans(&TransReq{ +func FireRequest() { + gid := common.GenGid() + logrus.Printf("busi transaction begin: %s", gid) + req := &TransReq{ Amount: 30, TransInFailed: false, - TransOutFailed: true, - }) + TransOutFailed: false, + } + saga := dtm.SagaNew(DtmServer, gid, Busi+"/TransQuery") + + saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", req) + saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", req) + saga.Prepare() + logrus.Printf("busi trans commit") + saga.Commit() } func StartSvr() { diff --git a/examples/saga.go b/examples/service.go similarity index 76% rename from examples/saga.go rename to examples/service.go index 8a2e715..8927213 100644 --- a/examples/saga.go +++ b/examples/service.go @@ -6,7 +6,6 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtm" ) type M = map[string]interface{} @@ -83,24 +82,3 @@ func TransQuery(c *gin.Context) { res := common.OrString(TransQueryResult, "SUCCESS") c.JSON(200, M{"result": res}) } - -func trans(req *TransReq) { - // gid := common.GenGid() - gid := "4eHhkCxVsQ1" - logrus.Printf("busi transaction begin: %s", gid) - saga := dtm.SagaNew(TcServer, gid, Busi+"/TransQuery") - - saga.Add(Busi+"/TransIn", Busi+"/TransInCompensate", M{ - "amount": req.Amount, - "transInFailed": req.TransInFailed, - "transOutFailed": req.TransOutFailed, - }) - saga.Add(Busi+"/TransOut", Busi+"/TransOutCompensate", M{ - "amount": req.Amount, - "transInFailed": req.TransInFailed, - "transOutFailed": req.TransOutFailed, - }) - saga.Prepare() - logrus.Printf("busi trans commit") - saga.Commit() -} diff --git a/main.go b/main.go index 597a761..5e7161d 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,8 @@ package main import ( - "encoding/json" "time" - "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) @@ -13,38 +11,8 @@ type M = map[string]interface{} func main() { dtmsvr.LoadConfig() - - s := common.MustMarshalString(M{ - "a": 1, - "b": "str", - }) - var obj interface{} - json.Unmarshal([]byte(s), &obj) - db := dtmsvr.DbGet() - tx := db.Begin() - common.PanicIfError(tx.Error) - dbr := tx.Commit() - common.PanicIfError(dbr.Error) - - tx = db.Begin() - common.PanicIfError(tx.Error) - dbr = tx.Commit() - common.PanicIfError(dbr.Error) - db.Exec("truncate test1.a_saga") - db.Exec("truncate test1.a_saga_step") - // logrus.SetFormatter(&logrus.JSONFormatter{}) - // dtmsvr.LoadConfig() - // rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) - // err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, M{ - // "gid": common.GenGid(), - // }) - // common.PanicIfError(err) - // queue := rb.QueueNew(dtmsvr.RabbitmqConstPrepared) - // queue.WaitAndHandle(func(data map[string]interface{}) { - // logrus.Printf("processed msg: %v in queue1", data) - // }) - - dtmsvr.Main() - examples.Main() + go dtmsvr.StartSvr() + go examples.StartSvr() + examples.FireRequest() time.Sleep(1000 * 1000 * 1000 * 1000) }