From 3a9aaa316697c92b5ccf79bd49669c398031c637 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 3 Aug 2021 18:10:43 +0800 Subject: [PATCH] wait_result may ok --- dtmsvr/api.go | 6 ++---- dtmsvr/cron.go | 12 ++++++++---- dtmsvr/trans.go | 22 ++++++++++++++++++++-- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index e614354..7c9af86 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -41,8 +41,7 @@ func submit(c *gin.Context) (interface{}, error) { } t.Status = "submitted" t.saveNew(db) - go t.Process(db) - return dtmcli.ResultSuccess, nil + return t.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil } func abort(c *gin.Context) (interface{}, error) { @@ -52,8 +51,7 @@ func abort(c *gin.Context) (interface{}, error) { if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil } - go dbt.Process(db) - return dtmcli.ResultSuccess, nil + return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil } func registerXaBranch(c *gin.Context) (interface{}, error) { diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 64a427d..a851918 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -1,6 +1,7 @@ package dtmsvr import ( + "fmt" "math" "math/rand" "runtime/debug" @@ -12,7 +13,7 @@ import ( // CronTransOnce cron expired trans. use expireIn as expire time func CronTransOnce(expireIn time.Duration) bool { - defer handlePanic() + defer handlePanic(nil) trans := lockOneTrans(expireIn) if trans == nil { return false @@ -20,7 +21,7 @@ func CronTransOnce(expireIn time.Duration) bool { if TransProcessedTestChan != nil { defer WaitTransProcessed(trans.Gid) } - trans.Process(dbGet()) + trans.Process(dbGet(), true) return true } @@ -52,9 +53,12 @@ func lockOneTrans(expireIn time.Duration) *TransGlobal { return &trans } -func handlePanic() { +func handlePanic(perr *error) { if err := recover(); err != nil { - common.RedLogf("----panic %s handlered\n%s", err.(error).Error(), string(debug.Stack())) + common.RedLogf("----panic %v handlered\n%s", err, string(debug.Stack())) + if perr != nil { + *perr = fmt.Errorf("dtm panic: %v", err) + } } } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 5d2beee..d2166a5 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -110,8 +111,24 @@ func (t *TransGlobal) getProcessor() transProcessor { } // Process process global transaction once -func (t *TransGlobal) Process(db *common.DB) { - defer handlePanic() +func (t *TransGlobal) Process(db *common.DB, waitResult bool) common.M { + if !waitResult { + go t.processInner(db) + return dtmcli.ResultSuccess + } + submitting := t.Status == "submitted" + err := t.processInner(db) + if err != nil { + return common.M{"dtm_result": "FAILURE", "message": err.Error()} + } + if submitting && t.Status != "succeed" { + return common.M{"dtm_result": "FAILURE", "message": "trans failed by user"} + } + return dtmcli.ResultSuccess +} + +func (t *TransGlobal) processInner(db *common.DB) (rerr error) { + defer handlePanic(&rerr) defer func() { if TransProcessedTestChan != nil { logrus.Printf("processed: %s", t.Gid) @@ -126,6 +143,7 @@ func (t *TransGlobal) Process(db *common.DB) { branches := []TransBranch{} db.Must().Where("gid=?", t.Gid).Order("id asc").Find(&branches) t.getProcessor().ProcessOnce(db, branches) + return } func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS {