diff --git a/dtmcli/message.go b/dtmcli/message.go index 0809def..e460f3f 100644 --- a/dtmcli/message.go +++ b/dtmcli/message.go @@ -2,13 +2,7 @@ package dtmcli // Msg reliable msg type type Msg struct { - MsgData TransBase -} - -// MsgData msg data -type MsgData struct { - TransData Steps []MsgStep `json:"steps"` QueryPrepared string `json:"query_prepared"` } @@ -22,19 +16,17 @@ type MsgStep struct { // NewMsg create new msg func NewMsg(server string, gid string) *Msg { return &Msg{ - MsgData: MsgData{TransData: TransData{ + TransBase: TransBase{ Gid: gid, TransType: "msg", - }}, - TransBase: TransBase{ - Dtm: server, + Dtm: server, }, } } // Add add a new step func (s *Msg) Add(action string, postData interface{}) *Msg { - Logf("msg %s Add %s %v", s.MsgData.Gid, action, postData) + Logf("msg %s Add %s %v", s.Gid, action, postData) step := MsgStep{ Action: action, Data: MustMarshalString(postData), @@ -46,10 +38,10 @@ func (s *Msg) Add(action string, postData interface{}) *Msg { // Prepare prepare the msg func (s *Msg) Prepare(queryPrepared string) error { s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared) - return s.CallDtm(&s.MsgData, "prepare") + return s.CallDtm(s, "prepare") } // Submit submit the msg func (s *Msg) Submit() error { - return s.CallDtm(&s.MsgData, "submit") + return s.CallDtm(s, "submit") } diff --git a/dtmcli/saga.go b/dtmcli/saga.go index d94e8f4..3d5127f 100644 --- a/dtmcli/saga.go +++ b/dtmcli/saga.go @@ -2,13 +2,7 @@ package dtmcli // Saga struct of saga type Saga struct { - SagaData TransBase -} - -// SagaData sage data -type SagaData struct { - TransData Steps []SagaStep `json:"steps"` } @@ -22,19 +16,17 @@ type SagaStep struct { // NewSaga create a saga func NewSaga(server string, gid string) *Saga { return &Saga{ - SagaData: SagaData{TransData: TransData{ + TransBase: TransBase{ Gid: gid, TransType: "saga", - }}, - TransBase: TransBase{ - Dtm: server, + Dtm: server, }, } } // Add add a saga step func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { - Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData) + Logf("saga %s Add %s %s %v", s.Gid, action, compensate, postData) step := SagaStep{ Action: action, Compensate: compensate, @@ -46,5 +38,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga // Submit submit the saga trans func (s *Saga) Submit() error { - return s.CallDtm(&s.SagaData, "submit") + return s.CallDtm(s, "submit") } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 3d489a6..c252474 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -9,7 +9,6 @@ import ( // Tcc struct of tcc type Tcc struct { - TransData TransBase } @@ -21,8 +20,13 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error) // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { - tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, TransData: TransData{Gid: gid, TransType: "tcc"}} - rerr = tcc.CallDtm(&tcc.TransData, "prepare") + tcc := &Tcc{ + TransBase: TransBase{ + Gid: gid, + TransType: "tcc", + Dtm: dtm, + }} + rerr = tcc.CallDtm(tcc, "prepare") if rerr != nil { return rerr } @@ -30,7 +34,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e defer func() { x := recover() operation := If(x == nil && rerr == nil, "submit", "abort").(string) - err := tcc.CallDtm(&tcc.TransData, operation) + err := tcc.CallDtm(tcc, operation) if rerr == nil { rerr = err } @@ -47,7 +51,6 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e func TccFromQuery(qs url.Values) (*Tcc, error) { tcc := &Tcc{ TransBase: *TransBaseFromQuery(qs), - TransData: TransData{Gid: qs.Get("gid"), TransType: "tcc"}, } if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID) diff --git a/dtmcli/types.go b/dtmcli/types.go index 64edcc9..ba30417 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -46,14 +46,10 @@ type TransResult struct { Message string } -// TransData 每个全局事务都有的数据 -type TransData struct { - Gid string `json:"gid"` - TransType string `json:"trans_type"` -} - // TransBase 事务的基础类 type TransBase struct { + Gid string `json:"gid"` + TransType string `json:"trans_type"` IDGenerator Dtm string // WaitResult 是否等待全局事务的最终结果 @@ -61,8 +57,10 @@ type TransBase struct { } // NewTransBase 1 -func NewTransBase(dtm string, parentID string) *TransBase { +func NewTransBase(gid string, transType string, dtm string, parentID string) *TransBase { return &TransBase{ + Gid: gid, + TransType: transType, IDGenerator: IDGenerator{parentID: parentID}, Dtm: dtm, } @@ -70,7 +68,7 @@ func NewTransBase(dtm string, parentID string) *TransBase { // TransBaseFromQuery construct transaction info from request func TransBaseFromQuery(qs url.Values) *TransBase { - return NewTransBase(qs.Get("dtm"), qs.Get("branch_id")) + return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id")) } // CallDtm 调用dtm服务器,返回事务的状态 diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 0967780..28ca57d 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -26,13 +26,12 @@ type XaClient struct { // Xa xa transaction type Xa struct { - TransData TransBase } // XaFromQuery construct xa info from request func XaFromQuery(qs url.Values) (*Xa, error) { - xa := &Xa{TransBase: *TransBaseFromQuery(qs), TransData: TransData{Gid: qs.Get("gid"), TransType: "xa"}} + xa := &Xa{TransBase: *TransBaseFromQuery(qs)} if xa.Gid == "" || xa.parentID == "" { return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, xa.parentID) } @@ -109,8 +108,8 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i // XaGlobalTransaction start a xa global transaction func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { - xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, TransData: TransData{Gid: gid, TransType: "xa"}} - rerr = xa.CallDtm(&xa.TransData, "prepare") + xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")} + rerr = xa.CallDtm(xa, "prepare") if rerr != nil { return } @@ -119,7 +118,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e defer func() { x := recover() operation := If(x != nil || rerr != nil, "abort", "submit").(string) - err := xa.CallDtm(&xa.TransData, operation) + err := xa.CallDtm(xa, operation) if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的 rerr = err } diff --git a/dtmgrpc/message.go b/dtmgrpc/message.go index ae3f4a4..d188070 100644 --- a/dtmgrpc/message.go +++ b/dtmgrpc/message.go @@ -8,20 +8,15 @@ import ( // MsgGrpc reliable msg type type MsgGrpc struct { - dtmcli.MsgData dtmcli.TransBase + Steps []dtmcli.MsgStep `json:"steps"` + QueryPrepared string `json:"query_prepared"` } // NewMsgGrpc create new msg func NewMsgGrpc(server string, gid string) *MsgGrpc { return &MsgGrpc{ - MsgData: dtmcli.MsgData{TransData: dtmcli.TransData{ - Gid: gid, - TransType: "msg", - }}, - TransBase: dtmcli.TransBase{ - Dtm: server, - }, + TransBase: *dtmcli.NewTransBase(gid, "msg", server, ""), } } diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go index 7f20a82..644c9cf 100644 --- a/dtmgrpc/saga.go +++ b/dtmgrpc/saga.go @@ -8,26 +8,20 @@ import ( // SagaGrpc struct of saga type SagaGrpc struct { - dtmcli.SagaData dtmcli.TransBase + Steps []dtmcli.SagaStep `json:"steps"` } // NewSaga create a saga func NewSaga(server string, gid string) *SagaGrpc { return &SagaGrpc{ - SagaData: dtmcli.SagaData{TransData: dtmcli.TransData{ - Gid: gid, - TransType: "saga", - }}, - TransBase: dtmcli.TransBase{ - Dtm: server, - }, + TransBase: *dtmcli.NewTransBase(gid, "saga", server, ""), } } // Add add a saga step func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc { - dtmcli.Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, string(busiData)) + dtmcli.Logf("saga %s Add %s %s %v", s.Gid, action, compensate, string(busiData)) step := dtmcli.SagaStep{ Action: action, Compensate: compensate, diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index 1a28a55..cb4df78 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -9,7 +9,6 @@ import ( // TccGrpc struct of tcc type TccGrpc struct { - dtmcli.TransData dtmcli.TransBase } @@ -21,7 +20,7 @@ type TccGlobalFunc func(tcc *TccGrpc) error // gid 全局事务id // tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { - tcc := &TccGrpc{TransBase: dtmcli.TransBase{Dtm: dtm}, TransData: dtmcli.TransData{Gid: gid, TransType: "tcc"}} + tcc := &TccGrpc{TransBase: *dtmcli.NewTransBase(gid, "tcc", dtm, "")} dc := MustGetDtmClient(tcc.Dtm) dr := &DtmRequest{ Gid: tcc.Gid, @@ -52,8 +51,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e // TccFromRequest tcc from request info func TccFromRequest(br *BusiRequest) (*TccGrpc, error) { tcc := &TccGrpc{ - TransBase: *dtmcli.NewTransBase(br.Dtm, br.Info.BranchID), - TransData: dtmcli.TransData{Gid: br.Info.BranchID, TransType: br.Info.TransType}, + TransBase: *dtmcli.NewTransBase(br.Info.Gid, br.Info.TransType, br.Dtm, br.Info.BranchID), } if tcc.Dtm == "" || tcc.Gid == "" { return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, br.Info.BranchID) diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index 59caba1..204d3dc 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -23,15 +23,13 @@ type XaGrpcClient struct { // XaGrpc xa transaction type XaGrpc struct { - dtmcli.TransData dtmcli.TransBase } // XaGrpcFromRequest construct xa info from request func XaGrpcFromRequest(br *BusiRequest) (*XaGrpc, error) { xa := &XaGrpc{ - TransBase: *dtmcli.NewTransBase(br.Dtm, br.Info.BranchID), - TransData: dtmcli.TransData{Gid: br.Info.Gid, TransType: br.Info.TransType}, + TransBase: *dtmcli.NewTransBase(br.Info.Gid, br.Info.TransType, br.Dtm, br.Info.BranchID), } if xa.Gid == "" || br.Info.BranchID == "" { return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, br.Info.BranchID) @@ -111,7 +109,7 @@ func (xc *XaGrpcClient) XaLocalTransaction(br *BusiRequest, xaFunc XaGrpcLocalFu // XaGlobalTransaction start a xa global transaction func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) (rerr error) { - xa := XaGrpc{TransBase: dtmcli.TransBase{Dtm: xc.Server}, TransData: dtmcli.TransData{Gid: gid, TransType: "xa"}} + xa := XaGrpc{TransBase: *dtmcli.NewTransBase(gid, "xa", xc.Server, "")} dc := MustGetDtmClient(xa.Dtm) req := &DtmRequest{ Gid: gid, diff --git a/examples/base_grpc.go b/examples/base_grpc.go index f0c5d01..c1b2468 100644 --- a/examples/base_grpc.go +++ b/examples/base_grpc.go @@ -31,8 +31,8 @@ func GrpcStartup() { dtmcli.FatalIfError(err) s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog)) RegisterBusiServer(s, &busiServer{}) - dtmcli.Logf("busi grpc listening at %v", lis.Addr()) go func() { + dtmcli.Logf("busi grpc listening at %v", lis.Addr()) err := s.Serve(lis) dtmcli.FatalIfError(err) }()