fix golint
This commit is contained in:
parent
90d84b77a2
commit
0f78db2b84
@ -9,8 +9,10 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// BusiFunc type for busi func
|
||||||
type BusiFunc func(db *sql.DB) (interface{}, error)
|
type BusiFunc func(db *sql.DB) (interface{}, error)
|
||||||
|
|
||||||
|
// TransInfo every branch info
|
||||||
type TransInfo struct {
|
type TransInfo struct {
|
||||||
TransType string
|
TransType string
|
||||||
Gid string
|
Gid string
|
||||||
@ -22,6 +24,7 @@ func (t *TransInfo) String() string {
|
|||||||
return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType)
|
return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TransInfoFromReq construct transaction info from request
|
||||||
func TransInfoFromReq(c *gin.Context) *TransInfo {
|
func TransInfoFromReq(c *gin.Context) *TransInfo {
|
||||||
ti := &TransInfo{
|
ti := &TransInfo{
|
||||||
TransType: c.Query("trans_type"),
|
TransType: c.Query("trans_type"),
|
||||||
@ -35,11 +38,13 @@ func TransInfoFromReq(c *gin.Context) *TransInfo {
|
|||||||
return ti
|
return ti
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BarrierModel barrier model for gorm
|
||||||
type BarrierModel struct {
|
type BarrierModel struct {
|
||||||
common.ModelBase
|
common.ModelBase
|
||||||
TransInfo
|
TransInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TableName gorm table name
|
||||||
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" }
|
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" }
|
||||||
|
|
||||||
func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) {
|
func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) {
|
||||||
@ -53,6 +58,7 @@ func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, br
|
|||||||
return res.RowsAffected()
|
return res.RowsAffected()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ThroughBarrierCall barrier interface. busiCall will be called only when the request is necessary
|
||||||
func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (res interface{}, rerr error) {
|
func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (res interface{}, rerr error) {
|
||||||
tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{})
|
tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{})
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
|
|||||||
@ -8,22 +8,27 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Msg reliable msg type
|
||||||
type Msg struct {
|
type Msg struct {
|
||||||
MsgData
|
MsgData
|
||||||
Server string
|
Server string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MsgData msg data
|
||||||
type MsgData struct {
|
type MsgData struct {
|
||||||
Gid string `json:"gid"`
|
Gid string `json:"gid"`
|
||||||
TransType string `json:"trans_type"`
|
TransType string `json:"trans_type"`
|
||||||
Steps []MsgStep `json:"steps"`
|
Steps []MsgStep `json:"steps"`
|
||||||
QueryPrepared string `json:"query_prepared"`
|
QueryPrepared string `json:"query_prepared"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MsgStep struct of one step msg
|
||||||
type MsgStep struct {
|
type MsgStep struct {
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
Data string `json:"data"`
|
Data string `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMsg create new msg
|
||||||
func NewMsg(server string) *Msg {
|
func NewMsg(server string) *Msg {
|
||||||
return &Msg{
|
return &Msg{
|
||||||
MsgData: MsgData{
|
MsgData: MsgData{
|
||||||
@ -33,6 +38,8 @@ func NewMsg(server string) *Msg {
|
|||||||
Server: server,
|
Server: server,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add add a new step
|
||||||
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
||||||
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData)
|
logrus.Printf("msg %s Add %s %v", s.Gid, action, postData)
|
||||||
step := MsgStep{
|
step := MsgStep{
|
||||||
@ -43,6 +50,7 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Submit submit the msg
|
||||||
func (s *Msg) Submit() error {
|
func (s *Msg) Submit() error {
|
||||||
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
|
logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData)
|
||||||
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
|
resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server))
|
||||||
@ -56,6 +64,7 @@ func (s *Msg) Submit() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare prepare the msg
|
||||||
func (s *Msg) Prepare(queryPrepared string) error {
|
func (s *Msg) Prepare(queryPrepared string) error {
|
||||||
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
|
s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared)
|
||||||
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData)
|
logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData)
|
||||||
|
|||||||
@ -8,22 +8,27 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Saga struct of saga
|
||||||
type Saga struct {
|
type Saga struct {
|
||||||
SagaData
|
SagaData
|
||||||
Server string
|
Server string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SagaData sage data
|
||||||
type SagaData struct {
|
type SagaData struct {
|
||||||
Gid string `json:"gid"`
|
Gid string `json:"gid"`
|
||||||
TransType string `json:"trans_type"`
|
TransType string `json:"trans_type"`
|
||||||
Steps []SagaStep `json:"steps"`
|
Steps []SagaStep `json:"steps"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SagaStep one step of saga
|
||||||
type SagaStep struct {
|
type SagaStep struct {
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
Compensate string `json:"compensate"`
|
Compensate string `json:"compensate"`
|
||||||
Data string `json:"data"`
|
Data string `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewSaga create a saga
|
||||||
func NewSaga(server string) *Saga {
|
func NewSaga(server string) *Saga {
|
||||||
return &Saga{
|
return &Saga{
|
||||||
SagaData: SagaData{
|
SagaData: SagaData{
|
||||||
@ -32,6 +37,8 @@ func NewSaga(server string) *Saga {
|
|||||||
Server: server,
|
Server: server,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add add a saga step
|
||||||
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
||||||
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
||||||
step := SagaStep{
|
step := SagaStep{
|
||||||
@ -43,6 +50,7 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Submit submit the saga trans
|
||||||
func (s *Saga) Submit() error {
|
func (s *Saga) Submit() error {
|
||||||
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
|
logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData)
|
||||||
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server))
|
resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server))
|
||||||
|
|||||||
@ -8,14 +8,17 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Tcc struct of tcc
|
||||||
type Tcc struct {
|
type Tcc struct {
|
||||||
IDGenerator
|
IDGenerator
|
||||||
Dtm string
|
Dtm string
|
||||||
Gid string
|
Gid string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TccGlobalFunc type of global tcc call
|
||||||
type TccGlobalFunc func(tcc *Tcc) error
|
type TccGlobalFunc func(tcc *Tcc) error
|
||||||
|
|
||||||
|
// TccGlobalTransaction begin a tcc global transaction
|
||||||
func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) {
|
func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) {
|
||||||
gid = GenGid(dtm)
|
gid = GenGid(dtm)
|
||||||
data := &M{
|
data := &M{
|
||||||
@ -38,6 +41,7 @@ func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr e
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TccFromReq tcc from request info
|
||||||
func TccFromReq(c *gin.Context) (*Tcc, error) {
|
func TccFromReq(c *gin.Context) (*Tcc, error) {
|
||||||
tcc := &Tcc{
|
tcc := &Tcc{
|
||||||
Dtm: c.Query("dtm"),
|
Dtm: c.Query("dtm"),
|
||||||
@ -50,7 +54,8 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
|
|||||||
return tcc, nil
|
return tcc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) {
|
// CallBranch call a tcc branch
|
||||||
|
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
|
||||||
branchID := t.NewBranchID()
|
branchID := t.NewBranchID()
|
||||||
resp, err := common.RestyClient.R().
|
resp, err := common.RestyClient.R().
|
||||||
SetBody(&M{
|
SetBody(&M{
|
||||||
@ -59,9 +64,9 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can
|
|||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
"status": "prepared",
|
"status": "prepared",
|
||||||
"data": string(common.MustMarshal(body)),
|
"data": string(common.MustMarshal(body)),
|
||||||
"try": tryUrl,
|
"try": tryURL,
|
||||||
"confirm": confirmUrl,
|
"confirm": confirmURL,
|
||||||
"cancel": cancelUrl,
|
"cancel": cancelURL,
|
||||||
}).
|
}).
|
||||||
Post(t.Dtm + "/registerTccBranch")
|
Post(t.Dtm + "/registerTccBranch")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -75,5 +80,5 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can
|
|||||||
"trans_type": "tcc",
|
"trans_type": "tcc",
|
||||||
"branch_type": "try",
|
"branch_type": "try",
|
||||||
}).
|
}).
|
||||||
Post(tryUrl)
|
Post(tryURL)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GenGid generate a new gid
|
||||||
func GenGid(server string) string {
|
func GenGid(server string) string {
|
||||||
res := common.MS{}
|
res := common.MS{}
|
||||||
_, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid")
|
_, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid")
|
||||||
@ -13,11 +14,13 @@ func GenGid(server string) string {
|
|||||||
return res["gid"]
|
return res["gid"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IDGenerator used to generate a branch id
|
||||||
type IDGenerator struct {
|
type IDGenerator struct {
|
||||||
parentID string
|
parentID string
|
||||||
branchID int
|
branchID int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBranchID generate a branch id
|
||||||
func (g *IDGenerator) NewBranchID() string {
|
func (g *IDGenerator) NewBranchID() string {
|
||||||
if g.branchID >= 99 {
|
if g.branchID >= 99 {
|
||||||
panic(fmt.Errorf("branch id is larger than 99"))
|
panic(fmt.Errorf("branch id is larger than 99"))
|
||||||
|
|||||||
32
dtmcli/xa.go
32
dtmcli/xa.go
@ -10,25 +10,31 @@ import (
|
|||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// M alias
|
||||||
type M = map[string]interface{}
|
type M = map[string]interface{}
|
||||||
|
|
||||||
var e2p = common.E2P
|
var e2p = common.E2P
|
||||||
|
|
||||||
|
// XaGlobalFunc type of xa global function
|
||||||
type XaGlobalFunc func(xa *Xa) error
|
type XaGlobalFunc func(xa *Xa) error
|
||||||
|
|
||||||
|
// XaLocalFunc type of xa local function
|
||||||
type XaLocalFunc func(db *common.DB, xa *Xa) error
|
type XaLocalFunc func(db *common.DB, xa *Xa) error
|
||||||
|
|
||||||
|
// XaClient xa client
|
||||||
type XaClient struct {
|
type XaClient struct {
|
||||||
Server string
|
Server string
|
||||||
Conf map[string]string
|
Conf map[string]string
|
||||||
CallbackUrl string
|
CallbackURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Xa xa transaction
|
||||||
type Xa struct {
|
type Xa struct {
|
||||||
IDGenerator
|
IDGenerator
|
||||||
Gid string
|
Gid string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetParams get xa params map
|
||||||
func (x *Xa) GetParams(branchID string) common.MS {
|
func (x *Xa) GetParams(branchID string) common.MS {
|
||||||
return common.MS{
|
return common.MS{
|
||||||
"gid": x.Gid,
|
"gid": x.Gid,
|
||||||
@ -38,6 +44,7 @@ func (x *Xa) GetParams(branchID string) common.MS {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XaFromReq construct xa info from request
|
||||||
func XaFromReq(c *gin.Context) *Xa {
|
func XaFromReq(c *gin.Context) *Xa {
|
||||||
return &Xa{
|
return &Xa{
|
||||||
Gid: c.Query("gid"),
|
Gid: c.Query("gid"),
|
||||||
@ -45,17 +52,19 @@ func XaFromReq(c *gin.Context) *Xa {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewXaBranchID generate a xa branch id
|
||||||
func (x *Xa) NewXaBranchID() string {
|
func (x *Xa) NewXaBranchID() string {
|
||||||
return x.Gid + "-" + x.NewBranchID()
|
return x.Gid + "-" + x.NewBranchID()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient {
|
// NewXaClient construct a xa client
|
||||||
|
func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) *XaClient {
|
||||||
xa := &XaClient{
|
xa := &XaClient{
|
||||||
Server: server,
|
Server: server,
|
||||||
Conf: mysqlConf,
|
Conf: mysqlConf,
|
||||||
CallbackUrl: callbackUrl,
|
CallbackURL: callbackURL,
|
||||||
}
|
}
|
||||||
u, err := url.Parse(callbackUrl)
|
u, err := url.Parse(callbackURL)
|
||||||
e2p(err)
|
e2p(err)
|
||||||
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) {
|
||||||
type CallbackReq struct {
|
type CallbackReq struct {
|
||||||
@ -82,18 +91,19 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca
|
|||||||
return xa
|
return xa
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XaLocalTransaction start a xa local transaction
|
||||||
func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (rerr error) {
|
func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (rerr error) {
|
||||||
defer common.P2E(&rerr)
|
defer common.P2E(&rerr)
|
||||||
xa := XaFromReq(c)
|
xa := XaFromReq(c)
|
||||||
branchId := xa.NewBranchID()
|
branchID := xa.NewBranchID()
|
||||||
xaBranch := xa.Gid + "-" + branchId
|
xaBranch := xa.Gid + "-" + branchID
|
||||||
tx, my := common.DbAlone(xc.Conf)
|
tx, my := common.DbAlone(xc.Conf)
|
||||||
defer func() { my.Close() }()
|
defer func() { my.Close() }()
|
||||||
tx.Must().Exec(fmt.Sprintf("XA start '%s'", xaBranch))
|
tx.Must().Exec(fmt.Sprintf("XA start '%s'", xaBranch))
|
||||||
err := transFunc(tx, xa)
|
err := transFunc(tx, xa)
|
||||||
e2p(err)
|
e2p(err)
|
||||||
resp, err := common.RestyClient.R().
|
resp, err := common.RestyClient.R().
|
||||||
SetBody(&M{"gid": xa.Gid, "branch_id": branchId, "trans_type": "xa", "status": "prepared", "url": xc.CallbackUrl}).
|
SetBody(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}).
|
||||||
Post(xc.Server + "/registerXaBranch")
|
Post(xc.Server + "/registerXaBranch")
|
||||||
e2p(err)
|
e2p(err)
|
||||||
if !strings.Contains(resp.String(), "SUCCESS") {
|
if !strings.Contains(resp.String(), "SUCCESS") {
|
||||||
@ -104,6 +114,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XaGlobalTransaction start a xa global transaction
|
||||||
func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
|
func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
|
||||||
xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)}
|
xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)}
|
||||||
gid = xa.Gid
|
gid = xa.Gid
|
||||||
@ -133,12 +144,13 @@ func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rer
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (xa *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
// CallBranch call a xa branch
|
||||||
branchID := xa.NewBranchID()
|
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
|
||||||
|
branchID := x.NewBranchID()
|
||||||
return common.RestyClient.R().
|
return common.RestyClient.R().
|
||||||
SetBody(body).
|
SetBody(body).
|
||||||
SetQueryParams(common.MS{
|
SetQueryParams(common.MS{
|
||||||
"gid": xa.Gid,
|
"gid": x.Gid,
|
||||||
"branch_id": branchID,
|
"branch_id": branchID,
|
||||||
"trans_type": "xa",
|
"trans_type": "xa",
|
||||||
"branch_type": "action",
|
"branch_type": "action",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user