diff --git a/common/types.go b/common/types.go index 85fd491..73523ec 100644 --- a/common/types.go +++ b/common/types.go @@ -3,7 +3,6 @@ package common import ( "database/sql" "fmt" - "os" "regexp" "strings" "time" @@ -92,7 +91,7 @@ func GetDsn(conf map[string]string) string { if IsDockerCompose() { conf["host"] = strings.Replace(conf["host"], "localhost", "host.docker.internal", 1) } - logrus.Printf("is docker: %t IS_DOCKER_COMPOSE: %s and conf host: %s", IsDockerCompose(), os.Getenv("IS_DOCKER_COMPOSE"), conf["host"]) + // logrus.Printf("is docker: %t IS_DOCKER_COMPOSE: %s and conf host: %s", IsDockerCompose(), os.Getenv("IS_DOCKER_COMPOSE"), conf["host"]) return fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"]) } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index f0a1b1a..50483e1 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -9,6 +9,7 @@ import ( ) type Tcc struct { + IDGenerator Dtm string Gid string } @@ -39,8 +40,9 @@ func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr e func TccFromReq(c *gin.Context) (*Tcc, error) { tcc := &Tcc{ - Dtm: c.Query("dtm"), - Gid: c.Query("gid"), + Dtm: c.Query("dtm"), + Gid: c.Query("gid"), + IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, } if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid) @@ -49,7 +51,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { } func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) { - branchID := GenGid(t.Dtm) + branchID := t.NewBranchID() resp, err := common.RestyClient.R(). SetBody(&M{ "gid": t.Gid, diff --git a/dtmcli/utils.go b/dtmcli/utils.go deleted file mode 100644 index a1470d7..0000000 --- a/dtmcli/utils.go +++ /dev/null @@ -1,10 +0,0 @@ -package dtmcli - -import "github.com/yedf/dtm/common" - -func GenGid(server string) string { - res := common.MS{} - _, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid") - e2p(err) - return res["gid"] -} diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 0483a45..54f84ff 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" "github.com/yedf/dtm/common" ) @@ -13,18 +14,43 @@ type M = map[string]interface{} var e2p = common.E2P -type XaGlobalFunc func(gid string) error +type XaGlobalFunc func(xa *Xa) error -type XaLocalFunc func(db *common.DB) error +type XaLocalFunc func(db *common.DB, xa *Xa) error -type Xa struct { +type XaClient struct { Server string Conf map[string]string CallbackUrl string } -func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *Xa { - xa := &Xa{ +type Xa struct { + IDGenerator + Gid string +} + +func (x *Xa) GetParams(branchID string) common.MS { + return common.MS{ + "gid": x.Gid, + "trans_type": "xa", + "branch_id": branchID, + "branch_type": "action", + } +} + +func XaFromReq(c *gin.Context) *Xa { + return &Xa{ + Gid: c.Query("gid"), + IDGenerator: IDGenerator{parentID: c.Query("branch_id")}, + } +} + +func (x *Xa) NewXaBranchID() string { + return x.Gid + "-" + x.NewBranchID() +} + +func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient { + xa := &XaClient{ Server: server, Conf: mysqlConf, CallbackUrl: callbackUrl, @@ -43,10 +69,11 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback common.MustUnmarshal(b, &req) tx, my := common.DbAlone(xa.Conf) defer my.Close() + branchID := req.Gid + "-" + req.BranchID if req.Action == "commit" { - tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.BranchID)) + tx.Must().Exec(fmt.Sprintf("xa commit '%s'", branchID)) } else if req.Action == "rollback" { - tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.BranchID)) + tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", branchID)) } else { panic(fmt.Errorf("unknown action: %s", req.Action)) } @@ -55,28 +82,31 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback return xa } -func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) { +func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (rerr error) { defer common.P2E(&rerr) - branchID := GenGid(xa.Server) - tx, my := common.DbAlone(xa.Conf) + xa := XaFromReq(c) + branchId := xa.NewBranchID() + xaBranch := xa.Gid + "-" + branchId + tx, my := common.DbAlone(xc.Conf) defer func() { my.Close() }() - tx.Must().Exec(fmt.Sprintf("XA start '%s'", branchID)) - err := transFunc(tx) + tx.Must().Exec(fmt.Sprintf("XA start '%s'", xaBranch)) + err := transFunc(tx, xa) e2p(err) resp, err := common.RestyClient.R(). - SetBody(&M{"gid": gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}). - Post(xa.Server + "/registerXaBranch") + SetBody(&M{"gid": xa.Gid, "branch_id": branchId, "trans_type": "xa", "status": "prepared", "url": xc.CallbackUrl}). + Post(xc.Server + "/registerXaBranch") e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { e2p(fmt.Errorf("unknown server response: %s", resp.String())) } - tx.Must().Exec(fmt.Sprintf("XA end '%s'", branchID)) - tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branchID)) + tx.Must().Exec(fmt.Sprintf("XA end '%s'", xaBranch)) + tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", xaBranch)) return nil } -func (xa *Xa) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { - gid = GenGid(xa.Server) +func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { + xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)} + gid = xa.Gid data := &M{ "gid": gid, "trans_type": "xa", @@ -84,21 +114,34 @@ func (xa *Xa) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr erro defer func() { x := recover() if x != nil { - _, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort") + _, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort") rerr = x.(error) } }() - resp, rerr := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare") + resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare") e2p(rerr) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("unexpected result: %s", resp.String())) } - rerr = transFunc(gid) + rerr = transFunc(&xa) e2p(rerr) - resp, rerr = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit") + resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit") e2p(rerr) if !strings.Contains(resp.String(), "SUCCESS") { panic(fmt.Errorf("unexpected result: %s", resp.String())) } return } + +func (xa *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { + branchID := xa.NewBranchID() + return common.RestyClient.R(). + SetBody(body). + SetQueryParams(common.MS{ + "gid": xa.Gid, + "branch_id": branchID, + "trans_type": "xa", + "branch_type": "action", + }). + Post(url) +} diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 46a8839..f3c47b2 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -85,18 +85,12 @@ func getBranchesStatus(gid string) []string { } func xaNormal(t *testing.T) { - xa := examples.XaClient - gid, err := xa.XaGlobalTransaction(func(gid string) error { + xc := examples.XaClient + gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { req := examples.GenTransReq(30, false, false) - resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "1", - }).Post(examples.Busi + "/TransOutXa") + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") common.CheckRestySuccess(resp, err) - resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "2", - }).Post(examples.Busi + "/TransInXa") + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") common.CheckRestySuccess(resp, err) return nil }) @@ -106,18 +100,12 @@ func xaNormal(t *testing.T) { } func xaRollback(t *testing.T) { - xa := examples.XaClient - gid, err := xa.XaGlobalTransaction(func(gid string) error { + xc := examples.XaClient + gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error { req := examples.GenTransReq(30, false, true) - resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "1", - }).Post(examples.Busi + "/TransOutXa") + resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") common.CheckRestySuccess(resp, err) - resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "2", - }).Post(examples.Busi + "/TransInXa") + resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa") common.CheckRestySuccess(resp, err) return nil }) diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go index b52bdbb..d4a3003 100644 --- a/dtmsvr/utils.go +++ b/dtmsvr/utils.go @@ -55,7 +55,7 @@ func init() { } func GenGid() string { - return getOneHexIp() + "-" + gNode.Generate().Base58() + return getOneHexIp() + "_" + gNode.Generate().Base58() } func getOneHexIp() string { diff --git a/examples/main_xa.go b/examples/main_xa.go index 5e2c64f..d7fce7f 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -9,7 +9,7 @@ import ( "gorm.io/gorm" ) -var XaClient *dtmcli.Xa = nil +var XaClient *dtmcli.XaClient = nil type UserAccount struct { common.ModelBase @@ -32,18 +32,12 @@ func dbGet() *common.DB { } func XaFireRequest() { - _, err := XaClient.XaGlobalTransaction(func(gid string) (rerr error) { + _, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) { defer common.P2E(&rerr) req := GenTransReq(30, false, false) - resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "1", - }).Post(Busi + "/TransOutXa") + resp, err := xa.CallBranch(req, Busi+"/TransOutXa") common.CheckRestySuccess(resp, err) - resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{ - "gid": gid, - "user_id": "2", - }).Post(Busi + "/TransInXa") + resp, err = xa.CallBranch(req, Busi+"/TransInXa") common.CheckRestySuccess(resp, err) return nil }) @@ -55,16 +49,16 @@ func XaSetup(app *gin.Engine) { app.POST(BusiApi+"/TransInXa", common.WrapHandler(xaTransIn)) app.POST(BusiApi+"/TransOutXa", common.WrapHandler(xaTransOut)) Config.Mysql["database"] = "dtm_busi" - XaClient = dtmcli.NewXa(DtmServer, Config.Mysql, app, Busi+"/xa") + XaClient = dtmcli.NewXaClient(DtmServer, Config.Mysql, app, Busi+"/xa") } func xaTransIn(c *gin.Context) (interface{}, error) { - err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) { + err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) { req := reqFrom(c) if req.TransInResult != "SUCCESS" { return fmt.Errorf("tranIn failed") } - dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). + dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2). Update("balance", gorm.Expr("balance + ?", req.Amount)) return dbr.Error }) @@ -73,12 +67,12 @@ func xaTransIn(c *gin.Context) (interface{}, error) { } func xaTransOut(c *gin.Context) (interface{}, error) { - err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) { + err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) { req := reqFrom(c) if req.TransOutResult != "SUCCESS" { return fmt.Errorf("tranOut failed") } - dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")). + dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1). Update("balance", gorm.Expr("balance - ?", req.Amount)) return dbr.Error })