use idgenerator to gen unique branch id
This commit is contained in:
parent
912abb2b52
commit
c14f396927
@ -3,7 +3,6 @@ package common
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
@ -92,7 +91,7 @@ func GetDsn(conf map[string]string) string {
|
||||
if IsDockerCompose() {
|
||||
conf["host"] = strings.Replace(conf["host"], "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
logrus.Printf("is docker: %t IS_DOCKER_COMPOSE: %s and conf host: %s", IsDockerCompose(), os.Getenv("IS_DOCKER_COMPOSE"), conf["host"])
|
||||
// logrus.Printf("is docker: %t IS_DOCKER_COMPOSE: %s and conf host: %s", IsDockerCompose(), os.Getenv("IS_DOCKER_COMPOSE"), conf["host"])
|
||||
return fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", conf["user"], conf["password"], conf["host"], conf["port"], conf["database"])
|
||||
}
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
type Tcc struct {
|
||||
IDGenerator
|
||||
Dtm string
|
||||
Gid string
|
||||
}
|
||||
@ -41,6 +42,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
|
||||
tcc := &Tcc{
|
||||
Dtm: c.Query("dtm"),
|
||||
Gid: c.Query("gid"),
|
||||
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
|
||||
}
|
||||
if tcc.Dtm == "" || tcc.Gid == "" {
|
||||
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s", tcc.Dtm, tcc.Gid)
|
||||
@ -49,7 +51,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
|
||||
}
|
||||
|
||||
func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) {
|
||||
branchID := GenGid(t.Dtm)
|
||||
branchID := t.NewBranchID()
|
||||
resp, err := common.RestyClient.R().
|
||||
SetBody(&M{
|
||||
"gid": t.Gid,
|
||||
|
||||
@ -1,10 +0,0 @@
|
||||
package dtmcli
|
||||
|
||||
import "github.com/yedf/dtm/common"
|
||||
|
||||
func GenGid(server string) string {
|
||||
res := common.MS{}
|
||||
_, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid")
|
||||
e2p(err)
|
||||
return res["gid"]
|
||||
}
|
||||
87
dtmcli/xa.go
87
dtmcli/xa.go
@ -6,6 +6,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/yedf/dtm/common"
|
||||
)
|
||||
|
||||
@ -13,18 +14,43 @@ type M = map[string]interface{}
|
||||
|
||||
var e2p = common.E2P
|
||||
|
||||
type XaGlobalFunc func(gid string) error
|
||||
type XaGlobalFunc func(xa *Xa) error
|
||||
|
||||
type XaLocalFunc func(db *common.DB) error
|
||||
type XaLocalFunc func(db *common.DB, xa *Xa) error
|
||||
|
||||
type Xa struct {
|
||||
type XaClient struct {
|
||||
Server string
|
||||
Conf map[string]string
|
||||
CallbackUrl string
|
||||
}
|
||||
|
||||
func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *Xa {
|
||||
xa := &Xa{
|
||||
type Xa struct {
|
||||
IDGenerator
|
||||
Gid string
|
||||
}
|
||||
|
||||
func (x *Xa) GetParams(branchID string) common.MS {
|
||||
return common.MS{
|
||||
"gid": x.Gid,
|
||||
"trans_type": "xa",
|
||||
"branch_id": branchID,
|
||||
"branch_type": "action",
|
||||
}
|
||||
}
|
||||
|
||||
func XaFromReq(c *gin.Context) *Xa {
|
||||
return &Xa{
|
||||
Gid: c.Query("gid"),
|
||||
IDGenerator: IDGenerator{parentID: c.Query("branch_id")},
|
||||
}
|
||||
}
|
||||
|
||||
func (x *Xa) NewXaBranchID() string {
|
||||
return x.Gid + "-" + x.NewBranchID()
|
||||
}
|
||||
|
||||
func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient {
|
||||
xa := &XaClient{
|
||||
Server: server,
|
||||
Conf: mysqlConf,
|
||||
CallbackUrl: callbackUrl,
|
||||
@ -43,10 +69,11 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
|
||||
common.MustUnmarshal(b, &req)
|
||||
tx, my := common.DbAlone(xa.Conf)
|
||||
defer my.Close()
|
||||
branchID := req.Gid + "-" + req.BranchID
|
||||
if req.Action == "commit" {
|
||||
tx.Must().Exec(fmt.Sprintf("xa commit '%s'", req.BranchID))
|
||||
tx.Must().Exec(fmt.Sprintf("xa commit '%s'", branchID))
|
||||
} else if req.Action == "rollback" {
|
||||
tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", req.BranchID))
|
||||
tx.Must().Exec(fmt.Sprintf("xa rollback '%s'", branchID))
|
||||
} else {
|
||||
panic(fmt.Errorf("unknown action: %s", req.Action))
|
||||
}
|
||||
@ -55,28 +82,31 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
|
||||
return xa
|
||||
}
|
||||
|
||||
func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) {
|
||||
func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (rerr error) {
|
||||
defer common.P2E(&rerr)
|
||||
branchID := GenGid(xa.Server)
|
||||
tx, my := common.DbAlone(xa.Conf)
|
||||
xa := XaFromReq(c)
|
||||
branchId := xa.NewBranchID()
|
||||
xaBranch := xa.Gid + "-" + branchId
|
||||
tx, my := common.DbAlone(xc.Conf)
|
||||
defer func() { my.Close() }()
|
||||
tx.Must().Exec(fmt.Sprintf("XA start '%s'", branchID))
|
||||
err := transFunc(tx)
|
||||
tx.Must().Exec(fmt.Sprintf("XA start '%s'", xaBranch))
|
||||
err := transFunc(tx, xa)
|
||||
e2p(err)
|
||||
resp, err := common.RestyClient.R().
|
||||
SetBody(&M{"gid": gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xa.CallbackUrl}).
|
||||
Post(xa.Server + "/registerXaBranch")
|
||||
SetBody(&M{"gid": xa.Gid, "branch_id": branchId, "trans_type": "xa", "status": "prepared", "url": xc.CallbackUrl}).
|
||||
Post(xc.Server + "/registerXaBranch")
|
||||
e2p(err)
|
||||
if !strings.Contains(resp.String(), "SUCCESS") {
|
||||
e2p(fmt.Errorf("unknown server response: %s", resp.String()))
|
||||
}
|
||||
tx.Must().Exec(fmt.Sprintf("XA end '%s'", branchID))
|
||||
tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", branchID))
|
||||
tx.Must().Exec(fmt.Sprintf("XA end '%s'", xaBranch))
|
||||
tx.Must().Exec(fmt.Sprintf("XA prepare '%s'", xaBranch))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (xa *Xa) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
|
||||
gid = GenGid(xa.Server)
|
||||
func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
|
||||
xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)}
|
||||
gid = xa.Gid
|
||||
data := &M{
|
||||
"gid": gid,
|
||||
"trans_type": "xa",
|
||||
@ -84,21 +114,34 @@ func (xa *Xa) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr erro
|
||||
defer func() {
|
||||
x := recover()
|
||||
if x != nil {
|
||||
_, _ = common.RestyClient.R().SetBody(data).Post(xa.Server + "/abort")
|
||||
_, _ = common.RestyClient.R().SetBody(data).Post(xc.Server + "/abort")
|
||||
rerr = x.(error)
|
||||
}
|
||||
}()
|
||||
resp, rerr := common.RestyClient.R().SetBody(data).Post(xa.Server + "/prepare")
|
||||
resp, rerr := common.RestyClient.R().SetBody(data).Post(xc.Server + "/prepare")
|
||||
e2p(rerr)
|
||||
if !strings.Contains(resp.String(), "SUCCESS") {
|
||||
panic(fmt.Errorf("unexpected result: %s", resp.String()))
|
||||
}
|
||||
rerr = transFunc(gid)
|
||||
rerr = transFunc(&xa)
|
||||
e2p(rerr)
|
||||
resp, rerr = common.RestyClient.R().SetBody(data).Post(xa.Server + "/submit")
|
||||
resp, rerr = common.RestyClient.R().SetBody(data).Post(xc.Server + "/submit")
|
||||
e2p(rerr)
|
||||
if !strings.Contains(resp.String(), "SUCCESS") {
|
||||
panic(fmt.Errorf("unexpected result: %s", resp.String()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (xa *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
||||
branchID := xa.NewBranchID()
|
||||
return common.RestyClient.R().
|
||||
SetBody(body).
|
||||
SetQueryParams(common.MS{
|
||||
"gid": xa.Gid,
|
||||
"branch_id": branchID,
|
||||
"trans_type": "xa",
|
||||
"branch_type": "action",
|
||||
}).
|
||||
Post(url)
|
||||
}
|
||||
|
||||
@ -85,18 +85,12 @@ func getBranchesStatus(gid string) []string {
|
||||
}
|
||||
|
||||
func xaNormal(t *testing.T) {
|
||||
xa := examples.XaClient
|
||||
gid, err := xa.XaGlobalTransaction(func(gid string) error {
|
||||
xc := examples.XaClient
|
||||
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
|
||||
req := examples.GenTransReq(30, false, false)
|
||||
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "1",
|
||||
}).Post(examples.Busi + "/TransOutXa")
|
||||
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "2",
|
||||
}).Post(examples.Busi + "/TransInXa")
|
||||
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
return nil
|
||||
})
|
||||
@ -106,18 +100,12 @@ func xaNormal(t *testing.T) {
|
||||
}
|
||||
|
||||
func xaRollback(t *testing.T) {
|
||||
xa := examples.XaClient
|
||||
gid, err := xa.XaGlobalTransaction(func(gid string) error {
|
||||
xc := examples.XaClient
|
||||
gid, err := xc.XaGlobalTransaction(func(xa *dtmcli.Xa) error {
|
||||
req := examples.GenTransReq(30, false, true)
|
||||
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "1",
|
||||
}).Post(examples.Busi + "/TransOutXa")
|
||||
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "2",
|
||||
}).Post(examples.Busi + "/TransInXa")
|
||||
resp, err = xa.CallBranch(req, examples.Busi+"/TransInXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -55,7 +55,7 @@ func init() {
|
||||
}
|
||||
|
||||
func GenGid() string {
|
||||
return getOneHexIp() + "-" + gNode.Generate().Base58()
|
||||
return getOneHexIp() + "_" + gNode.Generate().Base58()
|
||||
}
|
||||
|
||||
func getOneHexIp() string {
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var XaClient *dtmcli.Xa = nil
|
||||
var XaClient *dtmcli.XaClient = nil
|
||||
|
||||
type UserAccount struct {
|
||||
common.ModelBase
|
||||
@ -32,18 +32,12 @@ func dbGet() *common.DB {
|
||||
}
|
||||
|
||||
func XaFireRequest() {
|
||||
_, err := XaClient.XaGlobalTransaction(func(gid string) (rerr error) {
|
||||
_, err := XaClient.XaGlobalTransaction(func(xa *dtmcli.Xa) (rerr error) {
|
||||
defer common.P2E(&rerr)
|
||||
req := GenTransReq(30, false, false)
|
||||
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "1",
|
||||
}).Post(Busi + "/TransOutXa")
|
||||
resp, err := xa.CallBranch(req, Busi+"/TransOutXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
|
||||
"gid": gid,
|
||||
"user_id": "2",
|
||||
}).Post(Busi + "/TransInXa")
|
||||
resp, err = xa.CallBranch(req, Busi+"/TransInXa")
|
||||
common.CheckRestySuccess(resp, err)
|
||||
return nil
|
||||
})
|
||||
@ -55,16 +49,16 @@ func XaSetup(app *gin.Engine) {
|
||||
app.POST(BusiApi+"/TransInXa", common.WrapHandler(xaTransIn))
|
||||
app.POST(BusiApi+"/TransOutXa", common.WrapHandler(xaTransOut))
|
||||
Config.Mysql["database"] = "dtm_busi"
|
||||
XaClient = dtmcli.NewXa(DtmServer, Config.Mysql, app, Busi+"/xa")
|
||||
XaClient = dtmcli.NewXaClient(DtmServer, Config.Mysql, app, Busi+"/xa")
|
||||
}
|
||||
|
||||
func xaTransIn(c *gin.Context) (interface{}, error) {
|
||||
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) {
|
||||
err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) {
|
||||
req := reqFrom(c)
|
||||
if req.TransInResult != "SUCCESS" {
|
||||
return fmt.Errorf("tranIn failed")
|
||||
}
|
||||
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
|
||||
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2).
|
||||
Update("balance", gorm.Expr("balance + ?", req.Amount))
|
||||
return dbr.Error
|
||||
})
|
||||
@ -73,12 +67,12 @@ func xaTransIn(c *gin.Context) (interface{}, error) {
|
||||
}
|
||||
|
||||
func xaTransOut(c *gin.Context) (interface{}, error) {
|
||||
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) {
|
||||
err := XaClient.XaLocalTransaction(c, func(db *common.DB, xa *dtmcli.Xa) (rerr error) {
|
||||
req := reqFrom(c)
|
||||
if req.TransOutResult != "SUCCESS" {
|
||||
return fmt.Errorf("tranOut failed")
|
||||
}
|
||||
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
|
||||
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).
|
||||
Update("balance", gorm.Expr("balance - ?", req.Amount))
|
||||
return dbr.Error
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user