diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 5daf8fa..eaef70c 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -9,8 +9,10 @@ import ( "github.com/yedf/dtm/common" ) +// BusiFunc type for busi func type BusiFunc func(db *sql.DB) (interface{}, error) +// TransInfo every branch info type TransInfo struct { TransType string Gid string @@ -22,6 +24,7 @@ func (t *TransInfo) String() string { return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType) } +// TransInfoFromReq construct transaction info from request func TransInfoFromReq(c *gin.Context) *TransInfo { ti := &TransInfo{ TransType: c.Query("trans_type"), @@ -35,11 +38,13 @@ func TransInfoFromReq(c *gin.Context) *TransInfo { return ti } +// BarrierModel barrier model for gorm type BarrierModel struct { common.ModelBase TransInfo } +// TableName gorm table name func (BarrierModel) TableName() string { return "dtm_barrier.barrier" } func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) { @@ -53,6 +58,7 @@ func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, br return res.RowsAffected() } +// ThroughBarrierCall barrier interface. busiCall will be called only when the request is necessary func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc) (res interface{}, rerr error) { tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{}) if rerr != nil { diff --git a/dtmcli/message.go b/dtmcli/message.go index a5f984b..933bbe3 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -8,22 +8,27 @@ import ( "github.com/yedf/dtm/common" ) +// Msg reliable msg type type Msg struct { MsgData Server string } +// MsgData msg data type MsgData struct { Gid string `json:"gid"` TransType string `json:"trans_type"` Steps []MsgStep `json:"steps"` QueryPrepared string `json:"query_prepared"` } + +// MsgStep struct of one step msg type MsgStep struct { Action string `json:"action"` Data string `json:"data"` } +// NewMsg create new msg func NewMsg(server string) *Msg { return &Msg{ MsgData: MsgData{ @@ -33,6 +38,8 @@ func NewMsg(server string) *Msg { Server: server, } } + +// Add add a new step func (s *Msg) Add(action string, postData interface{}) *Msg { logrus.Printf("msg %s Add %s %v", s.Gid, action, postData) step := MsgStep{ @@ -43,6 +50,7 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { return s } +// Submit submit the msg func (s *Msg) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.MsgData) resp, err := common.RestyClient.R().SetBody(&s.MsgData).Post(fmt.Sprintf("%s/submit", s.Server)) @@ -56,6 +64,7 @@ func (s *Msg) Submit() error { return nil } +// Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = common.OrString(queryPrepared, s.QueryPrepared) logrus.Printf("preparing %s body: %v", s.Gid, &s.MsgData) diff --git a/dtmcli/saga.go b/dtmcli/saga.go index bf615c7..7399570 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -8,22 +8,27 @@ import ( "github.com/yedf/dtm/common" ) +// Saga struct of saga type Saga struct { SagaData Server string } +// SagaData sage data type SagaData struct { Gid string `json:"gid"` TransType string `json:"trans_type"` Steps []SagaStep `json:"steps"` } + +// SagaStep one step of saga type SagaStep struct { Action string `json:"action"` Compensate string `json:"compensate"` Data string `json:"data"` } +// NewSaga create a saga func NewSaga(server string) *Saga { return &Saga{ SagaData: SagaData{ @@ -32,6 +37,8 @@ func NewSaga(server string) *Saga { Server: server, } } + +// Add add a saga step func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { logrus.Printf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) step := SagaStep{ @@ -43,6 +50,7 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga return s } +// Submit submit the saga trans func (s *Saga) Submit() error { logrus.Printf("committing %s body: %v", s.Gid, &s.SagaData) resp, err := common.RestyClient.R().SetBody(&s.SagaData).Post(fmt.Sprintf("%s/submit", s.Server)) diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 50483e1..04386ec 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -8,14 +8,17 @@ import ( "github.com/yedf/dtm/common" ) +// Tcc struct of tcc type Tcc struct { IDGenerator Dtm string Gid string } +// TccGlobalFunc type of global tcc call type TccGlobalFunc func(tcc *Tcc) error +// TccGlobalTransaction begin a tcc global transaction func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) { gid = GenGid(dtm) data := &M{ @@ -38,6 +41,7 @@ func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr e return } +// TccFromReq tcc from request info func TccFromReq(c *gin.Context) (*Tcc, error) { tcc := &Tcc{ Dtm: c.Query("dtm"), @@ -50,7 +54,8 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { return tcc, nil } -func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) { +// CallBranch call a tcc branch +func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() resp, err := common.RestyClient.R(). SetBody(&M{ @@ -59,9 +64,9 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can "trans_type": "tcc", "status": "prepared", "data": string(common.MustMarshal(body)), - "try": tryUrl, - "confirm": confirmUrl, - "cancel": cancelUrl, + "try": tryURL, + "confirm": confirmURL, + "cancel": cancelURL, }). Post(t.Dtm + "/registerTccBranch") if err != nil { @@ -75,5 +80,5 @@ func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, can "trans_type": "tcc", "branch_type": "try", }). - Post(tryUrl) + Post(tryURL) } diff --git a/dtmcli/types.go b/dtmcli/types.go index 7b88b45..d6766f9 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -6,6 +6,7 @@ import ( "github.com/yedf/dtm/common" ) +// GenGid generate a new gid func GenGid(server string) string { res := common.MS{} _, err := common.RestyClient.R().SetResult(&res).Get(server + "/newGid") @@ -13,11 +14,13 @@ func GenGid(server string) string { return res["gid"] } +// IDGenerator used to generate a branch id type IDGenerator struct { parentID string branchID int } +// NewBranchID generate a branch id func (g *IDGenerator) NewBranchID() string { if g.branchID >= 99 { panic(fmt.Errorf("branch id is larger than 99")) diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 54f84ff..537c078 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -10,25 +10,31 @@ import ( "github.com/yedf/dtm/common" ) +// M alias type M = map[string]interface{} var e2p = common.E2P +// XaGlobalFunc type of xa global function type XaGlobalFunc func(xa *Xa) error +// XaLocalFunc type of xa local function type XaLocalFunc func(db *common.DB, xa *Xa) error +// XaClient xa client type XaClient struct { Server string Conf map[string]string - CallbackUrl string + CallbackURL string } +// Xa xa transaction type Xa struct { IDGenerator Gid string } +// GetParams get xa params map func (x *Xa) GetParams(branchID string) common.MS { return common.MS{ "gid": x.Gid, @@ -38,6 +44,7 @@ func (x *Xa) GetParams(branchID string) common.MS { } } +// XaFromReq construct xa info from request func XaFromReq(c *gin.Context) *Xa { return &Xa{ Gid: c.Query("gid"), @@ -45,17 +52,19 @@ func XaFromReq(c *gin.Context) *Xa { } } +// NewXaBranchID generate a xa branch id func (x *Xa) NewXaBranchID() string { return x.Gid + "-" + x.NewBranchID() } -func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackUrl string) *XaClient { +// NewXaClient construct a xa client +func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, callbackURL string) *XaClient { xa := &XaClient{ Server: server, Conf: mysqlConf, - CallbackUrl: callbackUrl, + CallbackURL: callbackURL, } - u, err := url.Parse(callbackUrl) + u, err := url.Parse(callbackURL) e2p(err) app.POST(u.Path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { type CallbackReq struct { @@ -82,18 +91,19 @@ func NewXaClient(server string, mysqlConf map[string]string, app *gin.Engine, ca return xa } +// XaLocalTransaction start a xa local transaction func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (rerr error) { defer common.P2E(&rerr) xa := XaFromReq(c) - branchId := xa.NewBranchID() - xaBranch := xa.Gid + "-" + branchId + branchID := xa.NewBranchID() + xaBranch := xa.Gid + "-" + branchID tx, my := common.DbAlone(xc.Conf) defer func() { my.Close() }() tx.Must().Exec(fmt.Sprintf("XA start '%s'", xaBranch)) err := transFunc(tx, xa) e2p(err) resp, err := common.RestyClient.R(). - SetBody(&M{"gid": xa.Gid, "branch_id": branchId, "trans_type": "xa", "status": "prepared", "url": xc.CallbackUrl}). + SetBody(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "status": "prepared", "url": xc.CallbackURL}). Post(xc.Server + "/registerXaBranch") e2p(err) if !strings.Contains(resp.String(), "SUCCESS") { @@ -104,6 +114,7 @@ func (xc *XaClient) XaLocalTransaction(c *gin.Context, transFunc XaLocalFunc) (r return nil } +// XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) { xa := Xa{IDGenerator: IDGenerator{}, Gid: GenGid(xc.Server)} gid = xa.Gid @@ -133,12 +144,13 @@ func (xc *XaClient) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rer return } -func (xa *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { - branchID := xa.NewBranchID() +// CallBranch call a xa branch +func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { + branchID := x.NewBranchID() return common.RestyClient.R(). SetBody(body). SetQueryParams(common.MS{ - "gid": xa.Gid, + "gid": x.Gid, "branch_id": branchID, "trans_type": "xa", "branch_type": "action",