diff --git a/.gitignore b/.gitignore index 220eede..e6d0a57 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ conf.yml* *.out */**/main -main \ No newline at end of file +main +intra-barrier.md diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 4f3f1b5..38686fe 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -50,8 +50,6 @@ func TestDtmSvr(t *testing.T) { xaNormal(t) xaRollback(t) sagaCommittedPending(t) - sagaPreparePending(t) - sagaPrepareCancel(t) sagaRollback(t) } @@ -132,8 +130,6 @@ func xaRollback(t *testing.T) { func tccNormal(t *testing.T) { tcc := genTcc("gid-tcc-normal", false, false) - tcc.Prepare(tcc.QueryPrepared) - assert.Equal(t, "prepared", getTransStatus(tcc.Gid)) tcc.Commit() assert.Equal(t, "committed", getTransStatus(tcc.Gid)) WaitTransProcessed(tcc.Gid) @@ -184,8 +180,6 @@ func msgPending(t *testing.T) { func sagaNormal(t *testing.T) { saga := genSaga("gid-noramlSaga", false, false) - saga.Prepare(saga.QueryPrepared) - assert.Equal(t, "prepared", getTransStatus(saga.Gid)) saga.Commit() assert.Equal(t, "committed", getTransStatus(saga.Gid)) WaitTransProcessed(saga.Gid) @@ -197,36 +191,12 @@ func sagaRollback(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) saga.Commit() WaitTransProcessed(saga.Gid) - saga.Prepare(saga.QueryPrepared) assert.Equal(t, "failed", getTransStatus(saga.Gid)) assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) } -func sagaPrepareCancel(t *testing.T) { - saga := genSaga("gid1-prepareCancel", false, true) - saga.Prepare(saga.QueryPrepared) - examples.SagaTransQueryResult = "FAIL" - config.PreparedExpire = -10 - CronTransOnce(60*time.Second, "prepared") - examples.SagaTransQueryResult = "" - config.PreparedExpire = 60 - assert.Equal(t, "canceled", getTransStatus(saga.Gid)) -} - -func sagaPreparePending(t *testing.T) { - saga := genSaga("gid1-preparePending", false, false) - saga.Prepare(saga.QueryPrepared) - examples.SagaTransQueryResult = "PENDING" - CronTransOnce(60*time.Second, "prepared") - examples.SagaTransQueryResult = "" - assert.Equal(t, "prepared", getTransStatus(saga.Gid)) - CronTransOnce(60*time.Second, "prepared") - assert.Equal(t, "succeed", getTransStatus(saga.Gid)) -} - func sagaCommittedPending(t *testing.T) { saga := genSaga("gid-committedPending", false, false) - saga.Prepare(saga.QueryPrepared) examples.SagaTransInResult = "PENDING" saga.Commit() WaitTransProcessed(saga.Gid) @@ -250,7 +220,6 @@ func genMsg(gid string) *dtm.Msg { func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { logrus.Printf("beginning a saga test ---------------- %s", gid) saga := dtm.SagaNew(examples.DtmServer, gid) - saga.QueryPrepared = examples.SagaBusi + "/TransQuery" req := examples.GenTransReq(30, outFailed, inFailed) saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req) saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req) @@ -260,7 +229,6 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga { func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc { logrus.Printf("beginning a tcc test ---------------- %s", gid) tcc := dtm.TccNew(examples.DtmServer, gid) - tcc.QueryPrepared = examples.TccBusi + "/TransQuery" req := examples.GenTransReq(30, outFailed, inFailed) tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req) tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 8da935b..47e630e 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -2,7 +2,6 @@ package dtmsvr import ( "fmt" - "strings" "time" "github.com/gin-gonic/gin" @@ -106,27 +105,6 @@ func (trans *TransGlobal) getProcessor() TransProcessor { return processorFac[trans.TransType](trans) } -func (t *TransGlobal) MayQueryPrepared(db *common.DB) { - if t.Status != "prepared" { - return - } - resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) - e2p(err) - body := resp.String() - if strings.Contains(body, "FAIL") { - preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) - logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local()) - status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string) - if status != t.Status { - t.changeStatus(db, status) - } else { - t.touch(db, t.NextCronInterval*2) - } - } else if strings.Contains(body, "SUCCESS") { - t.changeStatus(db, "committed") - } -} - func (trans *TransGlobal) Process(db *common.DB) { defer handlePanic() defer func() { diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index 88454f2..e3c31e7 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -3,7 +3,9 @@ package dtmsvr import ( "fmt" "strings" + "time" + "github.com/sirupsen/logrus" "github.com/yedf/dtm/common" ) @@ -44,8 +46,29 @@ func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } } +func (t *TransGlobal) mayQueryPrepared(db *common.DB) { + if t.Status != "prepared" { + return + } + resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) + e2p(err) + body := resp.String() + if strings.Contains(body, "FAIL") { + preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second) + logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local()) + status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string) + if status != t.Status { + t.changeStatus(db, status) + } else { + t.touch(db, t.NextCronInterval*2) + } + } else if strings.Contains(body, "SUCCESS") { + t.changeStatus(db, "committed") + } +} + func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - t.MayQueryPrepared(db) + t.mayQueryPrepared(db) if t.Status != "committed" { return } diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go index f4e6bf3..66052e3 100644 --- a/dtmsvr/trans_saga.go +++ b/dtmsvr/trans_saga.go @@ -51,7 +51,6 @@ func (t *TransSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } func (t *TransSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - t.MayQueryPrepared(db) if t.Status != "committed" { return } diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go index 21734fd..7c3a7f3 100644 --- a/dtmsvr/trans_tcc.go +++ b/dtmsvr/trans_tcc.go @@ -51,7 +51,6 @@ func (t *TransTccProcessor) ExecBranch(db *common.DB, branch *TransBranch) { } func (t *TransTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) { - t.MayQueryPrepared(db) if t.Status != "committed" { return } diff --git a/examples/examples.sql b/examples/examples.sql index 1db3e05..286a444 100644 --- a/examples/examples.sql +++ b/examples/examples.sql @@ -17,11 +17,11 @@ drop table if exists user_account_trading; create table if not exists user_account_trading( -- 表示交易中被冻结的金额 id int(11) PRIMARY KEY AUTO_INCREMENT, user_id int(11) UNIQUE , - frozen DECIMAL(10, 2) not null default '0', + trading_balance DECIMAL(10, 2) not null default '0', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time) ); -insert into user_account_trading (user_id, balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE balance=values (balance); \ No newline at end of file +insert into user_account_trading (user_id, trading_balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE trading_balance=values (trading_balance); \ No newline at end of file diff --git a/examples/main_msg.go b/examples/main_msg.go index 05d38d9..9b12819 100644 --- a/examples/main_msg.go +++ b/examples/main_msg.go @@ -11,7 +11,6 @@ import ( ) // 事务参与者的服务地址 -const MsgBusiPort = 8085 const MsgBusiApi = "/api/busi_msg" var MsgBusi = fmt.Sprintf("http://localhost:%d%s", MsgBusiPort, MsgBusiApi) diff --git a/examples/main_saga.go b/examples/main_saga.go index d39283f..4db843c 100644 --- a/examples/main_saga.go +++ b/examples/main_saga.go @@ -11,7 +11,6 @@ import ( ) // 事务参与者的服务地址 -const SagaBusiPort = 8081 const SagaBusiApi = "/api/busi_saga" var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi) @@ -40,10 +39,8 @@ func SagaFireRequest() { saga := dtm.SagaNew(DtmServer, gid). Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req). Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req) - err := saga.Prepare(SagaBusi + "/TransQuery") - e2p(err) logrus.Printf("busi trans commit") - err = saga.Commit() + err := saga.Commit() e2p(err) } @@ -54,7 +51,6 @@ func SagaAddRoute(app *gin.Engine) { app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(sagaTransInCompensate)) app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(SagaTransOut)) app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(sagaTransOutCompensate)) - app.GET(SagaBusiApi+"/TransQuery", common.WrapHandler(sagaTransQuery)) logrus.Printf("examples listening at %d", SagaBusiPort) } @@ -62,7 +58,6 @@ var SagaTransInResult = "" var SagaTransOutResult = "" var SagaTransInCompensateResult = "" var SagaTransOutCompensateResult = "" -var SagaTransQueryResult = "" func sagaTransIn(c *gin.Context) (interface{}, error) { gid := c.Query("gid") @@ -95,10 +90,3 @@ func sagaTransOutCompensate(c *gin.Context) (interface{}, error) { logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res) return M{"result": res}, nil } - -func sagaTransQuery(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - logrus.Printf("%s TransQuery", gid) - res := common.OrString(SagaTransQueryResult, "SUCCESS") - return M{"result": res}, nil -} diff --git a/examples/main_tcc.go b/examples/main_tcc.go index 774b9de..736f8bf 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -11,7 +11,6 @@ import ( ) // 事务参与者的服务地址 -const TccBusiPort = 8083 const TccBusiApi = "/api/busi_tcc" var TccBusi = fmt.Sprintf("http://localhost:%d%s", TccBusiPort, TccBusiApi) @@ -40,10 +39,8 @@ func TccFireRequest() { tcc := dtm.TccNew(DtmServer, gid). Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req). Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req) - err := tcc.Prepare(TccBusi + "/TransQuery") - e2p(err) logrus.Printf("busi trans commit") - err = tcc.Commit() + err := tcc.Commit() e2p(err) } @@ -56,7 +53,6 @@ func TccAddRoute(app *gin.Engine) { app.POST(TccBusiApi+"/TransOutTry", common.WrapHandler(tccTransOutTry)) app.POST(TccBusiApi+"/TransOutConfirm", common.WrapHandler(tccTransOutConfirm)) app.POST(TccBusiApi+"/TransOutCancel", common.WrapHandler(tccTransOutCancel)) - app.GET(TccBusiApi+"/TransQuery", common.WrapHandler(tccTransQuery)) logrus.Printf("examples listening at %d", TccBusiPort) } @@ -66,7 +62,6 @@ var TccTransInCancelResult = "" var TccTransOutCancelResult = "" var TccTransInConfirmResult = "" var TccTransOutConfirmResult = "" -var TccTransQueryResult = "" func tccTransInTry(c *gin.Context) (interface{}, error) { gid := c.Query("gid") @@ -115,10 +110,3 @@ func tccTransOutCancel(c *gin.Context) (interface{}, error) { logrus.Printf("%s tccTransOutCancel: %v result: %s", gid, req, res) return M{"result": res}, nil } - -func tccTransQuery(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - logrus.Printf("%s TransQuery", gid) - res := common.OrString(TccTransQueryResult, "SUCCESS") - return M{"result": res}, nil -} diff --git a/examples/main_xa.go b/examples/main_xa.go index 3903431..29c7eee 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -12,7 +12,6 @@ import ( ) // 事务参与者的服务地址 -const XaBusiPort = 8082 const XaBusiApi = "/api/busi_xa" var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi) diff --git a/examples/quick_start.go b/examples/quick_start.go index 251d4f4..eb0ced3 100644 --- a/examples/quick_start.go +++ b/examples/quick_start.go @@ -11,7 +11,6 @@ import ( ) // 事务参与者的服务地址 -const startBusiPort = 8084 const startBusiApi = "/api/busi_start" var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi) diff --git a/examples/types.go b/examples/types.go index 82e9efa..f282f2a 100644 --- a/examples/types.go +++ b/examples/types.go @@ -12,6 +12,16 @@ type M = map[string]interface{} // 指定dtm服务地址 const DtmServer = "http://localhost:8080/api/dtmsvr" +const ( + MsgBusiPort = iota + 8081 + SagaBusiPort + SagaBarrierBusiPort + TccBusiPort + TccBarrierBusiPort + XaBusiPort + startBusiPort +) + type TransReq struct { Amount int `json:"amount"` TransInResult string `json:"transInResult"` diff --git a/saga.go b/saga.go index 2a3d80e..3fd687a 100644 --- a/saga.go +++ b/saga.go @@ -13,10 +13,9 @@ type Saga struct { } type SagaData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []SagaStep `json:"steps"` - QueryPrepared string `json:"query_prepared"` + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []SagaStep `json:"steps"` } type SagaStep struct { Action string `json:"action"` @@ -55,16 +54,3 @@ func (s *Saga) Commit() error { } return nil } - -func (s *Saga) Prepare(queryPrepared string) error { - s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - logrus.Printf("preparing %s body: %v", s.Gid, &s.SagaData) - resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/prepare", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("prepare failed: %v", resp.Body()) - } - return nil -} diff --git a/tcc.go b/tcc.go index 77f4b93..df857a0 100644 --- a/tcc.go +++ b/tcc.go @@ -13,10 +13,9 @@ type Tcc struct { } type TccData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` - Steps []TccStep `json:"steps"` - QueryPrepared string `json:"query_prepared"` + Gid string `json:"gid"` + TransType string `json:"trans_type"` + Steps []TccStep `json:"steps"` } type TccStep struct { Try string `json:"try"` @@ -57,16 +56,3 @@ func (s *Tcc) Commit() error { } return nil } - -func (s *Tcc) Prepare(queryPrepared string) error { - s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) - logrus.Printf("preparing %s body: %v", s.Gid, &s.TccData) - resp, err := common.RestyClient.R().SetBody(&s.TccData).Post(fmt.Sprintf("%s/prepare", s.Server)) - if err != nil { - return err - } - if resp.StatusCode() != 200 { - return fmt.Errorf("prepare failed: %v", resp.Body()) - } - return nil -} diff --git a/xa.go b/xa.go index 5e051b2..8d67b97 100644 --- a/xa.go +++ b/xa.go @@ -42,9 +42,7 @@ func XaClientNew(server string, mysqlConf map[string]string, app *gin.Engine, ca e2p(err) common.MustUnmarshal(b, &req) tx, my := common.DbAlone(xa.Conf) - defer func() { - my.Close() - }() + defer my.Close() if req.Action == "commit" { tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.Branch)) } else if req.Action == "rollback" {