From ff6b6e79b4a18d34f9e5c2a5b990b6dd7dfef824 Mon Sep 17 00:00:00 2001 From: yedongfu Date: Thu, 27 May 2021 20:54:00 +0800 Subject: [PATCH] e2p --- common/types.go | 6 +++--- common/utils.go | 20 ++++++++++---------- dtmsvr/api.go | 4 ++-- dtmsvr/cron.go | 2 +- dtmsvr/dtmsvr_test.go | 12 ++++++------ dtmsvr/service.go | 2 +- dtmsvr/types.go | 3 +++ examples/saga_main.go | 6 +++--- examples/types.go | 2 ++ examples/xa_main.go | 8 ++++---- xa.go | 21 ++++++++++++--------- 11 files changed, 47 insertions(+), 39 deletions(-) diff --git a/common/types.go b/common/types.go index 9603235..2238d4b 100644 --- a/common/types.go +++ b/common/types.go @@ -93,7 +93,7 @@ func DbGet(conf map[string]string) *MyDb { db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ SkipDefaultTransaction: true, }) - PanicIfError(err) + E2P(err) db1.Use(&tracePlugin{}) dbs[dsn] = &MyDb{DB: db1} } @@ -114,11 +114,11 @@ func DbAlone(conf map[string]string) (*MyDb, *MyConn) { dsn := GetDsn(conf) logrus.Printf("opening alone mysql: %s", ReplaceDsnPassword(dsn)) mdb, err := sql.Open("mysql", dsn) - PanicIfError(err) + E2P(err) gormDB, err := gorm.Open(mysql.New(mysql.Config{ Conn: mdb, }), &gorm.Config{}) - PanicIfError(err) + E2P(err) gormDB.Use(&tracePlugin{}) return &MyDb{DB: gormDB}, &MyConn{Conn: mdb, Dsn: dsn} } diff --git a/common/utils.go b/common/utils.go index 03f035d..f8e24ab 100644 --- a/common/utils.go +++ b/common/utils.go @@ -29,7 +29,7 @@ func OrString(ss ...string) string { return "" } -func Panic2Error(perr *error) { +func P2E(perr *error) { if x := recover(); x != nil { if e, ok := x.(error); ok { *perr = e @@ -53,7 +53,7 @@ func init() { gNode = node } -func PanicIfError(err error) { +func E2P(err error) { if err != nil { panic(err) } @@ -68,7 +68,7 @@ func If(condition bool, trueObj interface{}, falseObj interface{}) interface{} { func MustMarshal(v interface{}) []byte { b, err := json.Marshal(v) - PanicIfError(err) + E2P(err) return b } @@ -78,7 +78,7 @@ func MustMarshalString(v interface{}) string { func MustUnmarshal(b []byte, obj interface{}) { err := json.Unmarshal(b, obj) - PanicIfError(err) + E2P(err) } func MustUnmarshalString(s string, obj interface{}) { MustUnmarshal([]byte(s), obj) @@ -86,9 +86,9 @@ func MustUnmarshalString(s string, obj interface{}) { func MustRemarshal(from interface{}, to interface{}) { b, err := json.Marshal(from) - PanicIfError(err) + E2P(err) err = json.Unmarshal(b, to) - PanicIfError(err) + E2P(err) } func GetGinApp() *gin.Engine { @@ -130,7 +130,7 @@ func WrapHandler(fn func(*gin.Context) (interface{}, error)) gin.HandlerFunc { c.Status(200) c.Writer.Header().Add("Content-Type", "application/json") _, err = c.Writer.Write(b) - PanicIfError(err) + E2P(err) } } } @@ -154,7 +154,7 @@ func init() { } func CheckRestySuccess(resp *resty.Response, err error) { - PanicIfError(err) + E2P(err) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("resty response not success: %s", resp.String())) } @@ -196,7 +196,7 @@ func InitApp(config interface{}) { configLoaded[fileName] = true viper.SetConfigFile(fileName) err := viper.ReadInConfig() - PanicIfError(err) + E2P(err) err = viper.Unmarshal(config) - PanicIfError(err) + E2P(err) } diff --git a/dtmsvr/api.go b/dtmsvr/api.go index e125592..df9a0d0 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -44,7 +44,7 @@ func Rollback(c *gin.Context) (interface{}, error) { func Branch(c *gin.Context) (interface{}, error) { branch := TransBranchModel{} err := c.BindJSON(&branch) - common.PanicIfError(err) + e2p(err) db := dbGet() db.Must().Clauses(clause.OnConflict{ DoNothing: true, @@ -55,7 +55,7 @@ func Branch(c *gin.Context) (interface{}, error) { func getTransFromContext(c *gin.Context) *TransGlobalModel { data := M{} b, err := c.GetRawData() - common.PanicIfError(err) + e2p(err) common.MustUnmarshal(b, &data) logrus.Printf("creating trans model in prepare") if data["trans_type"].(string) == "saga" { diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 495c995..5a032a2 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -21,7 +21,7 @@ func CronPreparedOnce(expire time.Duration) { writeTransLog(sm.Gid, "saga touch prepared", "", "", "") db.Must().Model(&sm).Update("id", sm.ID) resp, err := common.RestyClient.R().SetQueryParam("gid", sm.Gid).Get(sm.QueryPrepared) - common.PanicIfError(err) + e2p(err) body := resp.String() if strings.Contains(body, "FAIL") { preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index d708195..7a3ba5f 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -32,9 +32,9 @@ func TestDtmSvr(t *testing.T) { time.Sleep(time.Duration(100 * 1000 * 1000)) // 清理数据 - common.PanicIfError(dbGet().Exec("truncate trans_global").Error) - common.PanicIfError(dbGet().Exec("truncate trans_branch").Error) - common.PanicIfError(dbGet().Exec("truncate trans_log").Error) + e2p(dbGet().Exec("truncate trans_global").Error) + e2p(dbGet().Exec("truncate trans_branch").Error) + e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() xaRollback(t) @@ -61,14 +61,14 @@ var initdb = dbGet() func getSagaModel(gid string) *TransGlobalModel { sm := TransGlobalModel{} dbr := dbGet().Model(&sm).Where("gid=?", gid).First(&sm) - common.PanicIfError(dbr.Error) + e2p(dbr.Error) return &sm } func getBranchesStatus(gid string) []string { steps := []TransBranchModel{} dbr := dbGet().Model(&TransBranchModel{}).Where("gid=?", gid).Find(&steps) - common.PanicIfError(dbr.Error) + e2p(dbr.Error) status := []string{} for _, step := range steps { status = append(status, step.Status) @@ -93,7 +93,7 @@ func xaNormal(t *testing.T) { common.CheckRestySuccess(resp, err) return nil }) - common.PanicIfError(err) + e2p(err) WaitTransProcessed(gid) assert.Equal(t, []string{"finished", "finished"}, getBranchesStatus(gid)) } diff --git a/dtmsvr/service.go b/dtmsvr/service.go index 7ba4ddb..a2e4332 100644 --- a/dtmsvr/service.go +++ b/dtmsvr/service.go @@ -29,7 +29,7 @@ func saveCommitted(m *TransGlobalModel) { } return nil }) - common.PanicIfError(err) + e2p(err) } var TransProcessedTestChan chan string = nil // 用于测试时,通知处理结束 diff --git a/dtmsvr/types.go b/dtmsvr/types.go index e87fe3a..8d50d4c 100644 --- a/dtmsvr/types.go +++ b/dtmsvr/types.go @@ -10,6 +10,9 @@ import ( type M = map[string]interface{} +var p2e = common.P2E +var e2p = common.E2P + type TransGlobalModel struct { common.ModelBase Gid string `json:"gid"` diff --git a/examples/saga_main.go b/examples/saga_main.go index b4991cc..fb1e03d 100644 --- a/examples/saga_main.go +++ b/examples/saga_main.go @@ -42,10 +42,10 @@ func sagaFireRequest() { saga.Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req) saga.Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) err := saga.Prepare() - common.PanicIfError(err) + e2p(err) logrus.Printf("busi trans commit") err = saga.Commit() - common.PanicIfError(err) + e2p(err) } // api @@ -70,7 +70,7 @@ var TransQueryResult = "" func transReqFromContext(c *gin.Context) *TransReq { req := TransReq{} err := c.BindJSON(&req) - common.PanicIfError(err) + e2p(err) return &req } diff --git a/examples/types.go b/examples/types.go index 49d6c0b..3e07f9c 100644 --- a/examples/types.go +++ b/examples/types.go @@ -2,6 +2,8 @@ package examples import "github.com/yedf/dtm/common" +var e2p = common.E2P + type UserAccount struct { common.ModelBase UserId int diff --git a/examples/xa_main.go b/examples/xa_main.go index 03df76c..ebb4ec9 100644 --- a/examples/xa_main.go +++ b/examples/xa_main.go @@ -38,7 +38,7 @@ func XaStartSvr() { func XaFireRequest() { gid := common.GenGid() err := XaClient.XaGlobalTransaction(gid, func() (rerr error) { - defer common.Panic2Error(&rerr) + defer common.P2E(&rerr) req := GenTransReq(30, false, false) resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ "gid": gid, @@ -52,7 +52,7 @@ func XaFireRequest() { common.CheckRestySuccess(resp, err) return nil }) - common.PanicIfError(err) + e2p(err) } // api @@ -71,7 +71,7 @@ func XaTransIn(c *gin.Context) (interface{}, error) { Update("balance", gorm.Expr("balance - ?", req.Amount)) return dbr.Error }) - common.PanicIfError(err) + e2p(err) return M{"result": "SUCCESS"}, nil } @@ -85,7 +85,7 @@ func XaTransOut(c *gin.Context) (interface{}, error) { Update("balance", gorm.Expr("balance + ?", req.Amount)) return dbr.Error }) - common.PanicIfError(err) + e2p(err) return M{"result": "SUCCESS"}, nil } diff --git a/xa.go b/xa.go index 1acda53..db19f7d 100644 --- a/xa.go +++ b/xa.go @@ -10,6 +10,9 @@ import ( ) type M = map[string]interface{} + +var e2p = common.E2P + type XaGlobalFunc func() error type XaLocalFunc func(db *common.MyDb) error @@ -27,7 +30,7 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca CallbackUrl: callbackUrl, } u, err := url.Parse(callbackUrl) - common.PanicIfError(err) + e2p(err) app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { type CallbackReq struct { Gid string `json:"gid"` @@ -36,7 +39,7 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca } req := CallbackReq{} b, err := c.GetRawData() - common.PanicIfError(err) + e2p(err) common.MustUnmarshal(b, &req) tx, my := common.DbAlone(xa.Conf) defer func() { @@ -54,19 +57,19 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca return xa } func (xa *XaClient) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { - defer common.Panic2Error(&rerr) + defer common.P2E(&rerr) branch := common.GenGid() tx, my := common.DbAlone(xa.Conf) defer func() { my.Close() }() tx.Must().Exec(fmt.Sprintf("XA start '%s'", branch)) err := transFunc(tx) - common.PanicIfError(err) + e2p(err) resp, err := common.RestyClient.R(). SetBody(&M{"gid": gid, "branch": branch, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). Post(xa.Server + "/branch") - common.PanicIfError(err) + e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { - common.PanicIfError(fmt.Errorf("unknown server response: %s", resp.String())) + e2p(fmt.Errorf("unknown server response: %s", resp.String())) } tx.Must().Exec(fmt.Sprintf("XA end '%s'", branch)) tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branch)) @@ -86,14 +89,14 @@ func (xa *XaClient) XaGlobalTransaction(gid string, transFunc XaGlobalFunc) (rer } }() resp, err := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare") - common.PanicIfError(err) + e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("unexpected result: %s", resp.String())) } err = transFunc() - common.PanicIfError(err) + e2p(err) resp, err = common.RestyClient.R().SetBody(data).Post(xa.Server + "/commit") - common.PanicIfError(err) + e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("unexpected result: %s", resp.String())) }