use TransBase
This commit is contained in:
parent
e8a23f0932
commit
f005c1a274
@ -2,13 +2,7 @@ package dtmcli
|
|||||||
|
|
||||||
// Msg reliable msg type
|
// Msg reliable msg type
|
||||||
type Msg struct {
|
type Msg struct {
|
||||||
MsgData
|
|
||||||
TransBase
|
TransBase
|
||||||
}
|
|
||||||
|
|
||||||
// MsgData msg data
|
|
||||||
type MsgData struct {
|
|
||||||
TransData
|
|
||||||
Steps []MsgStep `json:"steps"`
|
Steps []MsgStep `json:"steps"`
|
||||||
QueryPrepared string `json:"query_prepared"`
|
QueryPrepared string `json:"query_prepared"`
|
||||||
}
|
}
|
||||||
@ -22,19 +16,17 @@ type MsgStep struct {
|
|||||||
// NewMsg create new msg
|
// NewMsg create new msg
|
||||||
func NewMsg(server string, gid string) *Msg {
|
func NewMsg(server string, gid string) *Msg {
|
||||||
return &Msg{
|
return &Msg{
|
||||||
MsgData: MsgData{TransData: TransData{
|
TransBase: TransBase{
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
TransType: "msg",
|
TransType: "msg",
|
||||||
}},
|
Dtm: server,
|
||||||
TransBase: TransBase{
|
|
||||||
Dtm: server,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a new step
|
// Add add a new step
|
||||||
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
func (s *Msg) Add(action string, postData interface{}) *Msg {
|
||||||
Logf("msg %s Add %s %v", s.MsgData.Gid, action, postData)
|
Logf("msg %s Add %s %v", s.Gid, action, postData)
|
||||||
step := MsgStep{
|
step := MsgStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Data: MustMarshalString(postData),
|
Data: MustMarshalString(postData),
|
||||||
@ -46,10 +38,10 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
|
|||||||
// Prepare prepare the msg
|
// Prepare prepare the msg
|
||||||
func (s *Msg) Prepare(queryPrepared string) error {
|
func (s *Msg) Prepare(queryPrepared string) error {
|
||||||
s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared)
|
s.QueryPrepared = OrString(queryPrepared, s.QueryPrepared)
|
||||||
return s.CallDtm(&s.MsgData, "prepare")
|
return s.CallDtm(s, "prepare")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit submit the msg
|
// Submit submit the msg
|
||||||
func (s *Msg) Submit() error {
|
func (s *Msg) Submit() error {
|
||||||
return s.CallDtm(&s.MsgData, "submit")
|
return s.CallDtm(s, "submit")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,13 +2,7 @@ package dtmcli
|
|||||||
|
|
||||||
// Saga struct of saga
|
// Saga struct of saga
|
||||||
type Saga struct {
|
type Saga struct {
|
||||||
SagaData
|
|
||||||
TransBase
|
TransBase
|
||||||
}
|
|
||||||
|
|
||||||
// SagaData sage data
|
|
||||||
type SagaData struct {
|
|
||||||
TransData
|
|
||||||
Steps []SagaStep `json:"steps"`
|
Steps []SagaStep `json:"steps"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -22,19 +16,17 @@ type SagaStep struct {
|
|||||||
// NewSaga create a saga
|
// NewSaga create a saga
|
||||||
func NewSaga(server string, gid string) *Saga {
|
func NewSaga(server string, gid string) *Saga {
|
||||||
return &Saga{
|
return &Saga{
|
||||||
SagaData: SagaData{TransData: TransData{
|
TransBase: TransBase{
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
TransType: "saga",
|
TransType: "saga",
|
||||||
}},
|
Dtm: server,
|
||||||
TransBase: TransBase{
|
|
||||||
Dtm: server,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a saga step
|
// Add add a saga step
|
||||||
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
|
||||||
Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, postData)
|
Logf("saga %s Add %s %s %v", s.Gid, action, compensate, postData)
|
||||||
step := SagaStep{
|
step := SagaStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
@ -46,5 +38,5 @@ func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga
|
|||||||
|
|
||||||
// Submit submit the saga trans
|
// Submit submit the saga trans
|
||||||
func (s *Saga) Submit() error {
|
func (s *Saga) Submit() error {
|
||||||
return s.CallDtm(&s.SagaData, "submit")
|
return s.CallDtm(s, "submit")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
// Tcc struct of tcc
|
// Tcc struct of tcc
|
||||||
type Tcc struct {
|
type Tcc struct {
|
||||||
TransData
|
|
||||||
TransBase
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -21,8 +20,13 @@ type TccGlobalFunc func(tcc *Tcc) (*resty.Response, error)
|
|||||||
// gid 全局事务id
|
// gid 全局事务id
|
||||||
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
||||||
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
||||||
tcc := &Tcc{TransBase: TransBase{Dtm: dtm}, TransData: TransData{Gid: gid, TransType: "tcc"}}
|
tcc := &Tcc{
|
||||||
rerr = tcc.CallDtm(&tcc.TransData, "prepare")
|
TransBase: TransBase{
|
||||||
|
Gid: gid,
|
||||||
|
TransType: "tcc",
|
||||||
|
Dtm: dtm,
|
||||||
|
}}
|
||||||
|
rerr = tcc.CallDtm(tcc, "prepare")
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return rerr
|
return rerr
|
||||||
}
|
}
|
||||||
@ -30,7 +34,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
defer func() {
|
defer func() {
|
||||||
x := recover()
|
x := recover()
|
||||||
operation := If(x == nil && rerr == nil, "submit", "abort").(string)
|
operation := If(x == nil && rerr == nil, "submit", "abort").(string)
|
||||||
err := tcc.CallDtm(&tcc.TransData, operation)
|
err := tcc.CallDtm(tcc, operation)
|
||||||
if rerr == nil {
|
if rerr == nil {
|
||||||
rerr = err
|
rerr = err
|
||||||
}
|
}
|
||||||
@ -47,7 +51,6 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
func TccFromQuery(qs url.Values) (*Tcc, error) {
|
func TccFromQuery(qs url.Values) (*Tcc, error) {
|
||||||
tcc := &Tcc{
|
tcc := &Tcc{
|
||||||
TransBase: *TransBaseFromQuery(qs),
|
TransBase: *TransBaseFromQuery(qs),
|
||||||
TransData: TransData{Gid: qs.Get("gid"), TransType: "tcc"},
|
|
||||||
}
|
}
|
||||||
if tcc.Dtm == "" || tcc.Gid == "" {
|
if tcc.Dtm == "" || tcc.Gid == "" {
|
||||||
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID)
|
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, tcc.parentID)
|
||||||
|
|||||||
@ -46,14 +46,10 @@ type TransResult struct {
|
|||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
|
||||||
// TransData 每个全局事务都有的数据
|
|
||||||
type TransData struct {
|
|
||||||
Gid string `json:"gid"`
|
|
||||||
TransType string `json:"trans_type"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TransBase 事务的基础类
|
// TransBase 事务的基础类
|
||||||
type TransBase struct {
|
type TransBase struct {
|
||||||
|
Gid string `json:"gid"`
|
||||||
|
TransType string `json:"trans_type"`
|
||||||
IDGenerator
|
IDGenerator
|
||||||
Dtm string
|
Dtm string
|
||||||
// WaitResult 是否等待全局事务的最终结果
|
// WaitResult 是否等待全局事务的最终结果
|
||||||
@ -61,8 +57,10 @@ type TransBase struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewTransBase 1
|
// NewTransBase 1
|
||||||
func NewTransBase(dtm string, parentID string) *TransBase {
|
func NewTransBase(gid string, transType string, dtm string, parentID string) *TransBase {
|
||||||
return &TransBase{
|
return &TransBase{
|
||||||
|
Gid: gid,
|
||||||
|
TransType: transType,
|
||||||
IDGenerator: IDGenerator{parentID: parentID},
|
IDGenerator: IDGenerator{parentID: parentID},
|
||||||
Dtm: dtm,
|
Dtm: dtm,
|
||||||
}
|
}
|
||||||
@ -70,7 +68,7 @@ func NewTransBase(dtm string, parentID string) *TransBase {
|
|||||||
|
|
||||||
// TransBaseFromQuery construct transaction info from request
|
// TransBaseFromQuery construct transaction info from request
|
||||||
func TransBaseFromQuery(qs url.Values) *TransBase {
|
func TransBaseFromQuery(qs url.Values) *TransBase {
|
||||||
return NewTransBase(qs.Get("dtm"), qs.Get("branch_id"))
|
return NewTransBase(qs.Get("gid"), qs.Get("trans_type"), qs.Get("dtm"), qs.Get("branch_id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallDtm 调用dtm服务器,返回事务的状态
|
// CallDtm 调用dtm服务器,返回事务的状态
|
||||||
|
|||||||
@ -26,13 +26,12 @@ type XaClient struct {
|
|||||||
|
|
||||||
// Xa xa transaction
|
// Xa xa transaction
|
||||||
type Xa struct {
|
type Xa struct {
|
||||||
TransData
|
|
||||||
TransBase
|
TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaFromQuery construct xa info from request
|
// XaFromQuery construct xa info from request
|
||||||
func XaFromQuery(qs url.Values) (*Xa, error) {
|
func XaFromQuery(qs url.Values) (*Xa, error) {
|
||||||
xa := &Xa{TransBase: *TransBaseFromQuery(qs), TransData: TransData{Gid: qs.Get("gid"), TransType: "xa"}}
|
xa := &Xa{TransBase: *TransBaseFromQuery(qs)}
|
||||||
if xa.Gid == "" || xa.parentID == "" {
|
if xa.Gid == "" || xa.parentID == "" {
|
||||||
return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, xa.parentID)
|
return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, xa.parentID)
|
||||||
}
|
}
|
||||||
@ -109,8 +108,8 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i
|
|||||||
|
|
||||||
// XaGlobalTransaction start a xa global transaction
|
// XaGlobalTransaction start a xa global transaction
|
||||||
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
|
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
|
||||||
xa := Xa{TransBase: TransBase{IDGenerator: IDGenerator{}, Dtm: xc.Server}, TransData: TransData{Gid: gid, TransType: "xa"}}
|
xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")}
|
||||||
rerr = xa.CallDtm(&xa.TransData, "prepare")
|
rerr = xa.CallDtm(xa, "prepare")
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -119,7 +118,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e
|
|||||||
defer func() {
|
defer func() {
|
||||||
x := recover()
|
x := recover()
|
||||||
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
|
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
|
||||||
err := xa.CallDtm(&xa.TransData, operation)
|
err := xa.CallDtm(xa, operation)
|
||||||
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
if rerr == nil { // 如果用户函数没有返回错误,那么返回dtm的
|
||||||
rerr = err
|
rerr = err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,20 +8,15 @@ import (
|
|||||||
|
|
||||||
// MsgGrpc reliable msg type
|
// MsgGrpc reliable msg type
|
||||||
type MsgGrpc struct {
|
type MsgGrpc struct {
|
||||||
dtmcli.MsgData
|
|
||||||
dtmcli.TransBase
|
dtmcli.TransBase
|
||||||
|
Steps []dtmcli.MsgStep `json:"steps"`
|
||||||
|
QueryPrepared string `json:"query_prepared"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMsgGrpc create new msg
|
// NewMsgGrpc create new msg
|
||||||
func NewMsgGrpc(server string, gid string) *MsgGrpc {
|
func NewMsgGrpc(server string, gid string) *MsgGrpc {
|
||||||
return &MsgGrpc{
|
return &MsgGrpc{
|
||||||
MsgData: dtmcli.MsgData{TransData: dtmcli.TransData{
|
TransBase: *dtmcli.NewTransBase(gid, "msg", server, ""),
|
||||||
Gid: gid,
|
|
||||||
TransType: "msg",
|
|
||||||
}},
|
|
||||||
TransBase: dtmcli.TransBase{
|
|
||||||
Dtm: server,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -8,26 +8,20 @@ import (
|
|||||||
|
|
||||||
// SagaGrpc struct of saga
|
// SagaGrpc struct of saga
|
||||||
type SagaGrpc struct {
|
type SagaGrpc struct {
|
||||||
dtmcli.SagaData
|
|
||||||
dtmcli.TransBase
|
dtmcli.TransBase
|
||||||
|
Steps []dtmcli.SagaStep `json:"steps"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSaga create a saga
|
// NewSaga create a saga
|
||||||
func NewSaga(server string, gid string) *SagaGrpc {
|
func NewSaga(server string, gid string) *SagaGrpc {
|
||||||
return &SagaGrpc{
|
return &SagaGrpc{
|
||||||
SagaData: dtmcli.SagaData{TransData: dtmcli.TransData{
|
TransBase: *dtmcli.NewTransBase(gid, "saga", server, ""),
|
||||||
Gid: gid,
|
|
||||||
TransType: "saga",
|
|
||||||
}},
|
|
||||||
TransBase: dtmcli.TransBase{
|
|
||||||
Dtm: server,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a saga step
|
// Add add a saga step
|
||||||
func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc {
|
func (s *SagaGrpc) Add(action string, compensate string, busiData []byte) *SagaGrpc {
|
||||||
dtmcli.Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, string(busiData))
|
dtmcli.Logf("saga %s Add %s %s %v", s.Gid, action, compensate, string(busiData))
|
||||||
step := dtmcli.SagaStep{
|
step := dtmcli.SagaStep{
|
||||||
Action: action,
|
Action: action,
|
||||||
Compensate: compensate,
|
Compensate: compensate,
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
// TccGrpc struct of tcc
|
// TccGrpc struct of tcc
|
||||||
type TccGrpc struct {
|
type TccGrpc struct {
|
||||||
dtmcli.TransData
|
|
||||||
dtmcli.TransBase
|
dtmcli.TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -21,7 +20,7 @@ type TccGlobalFunc func(tcc *TccGrpc) error
|
|||||||
// gid 全局事务id
|
// gid 全局事务id
|
||||||
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
// tccFunc tcc事务函数,里面会定义全局事务的分支
|
||||||
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) {
|
||||||
tcc := &TccGrpc{TransBase: dtmcli.TransBase{Dtm: dtm}, TransData: dtmcli.TransData{Gid: gid, TransType: "tcc"}}
|
tcc := &TccGrpc{TransBase: *dtmcli.NewTransBase(gid, "tcc", dtm, "")}
|
||||||
dc := MustGetDtmClient(tcc.Dtm)
|
dc := MustGetDtmClient(tcc.Dtm)
|
||||||
dr := &DtmRequest{
|
dr := &DtmRequest{
|
||||||
Gid: tcc.Gid,
|
Gid: tcc.Gid,
|
||||||
@ -52,8 +51,7 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
// TccFromRequest tcc from request info
|
// TccFromRequest tcc from request info
|
||||||
func TccFromRequest(br *BusiRequest) (*TccGrpc, error) {
|
func TccFromRequest(br *BusiRequest) (*TccGrpc, error) {
|
||||||
tcc := &TccGrpc{
|
tcc := &TccGrpc{
|
||||||
TransBase: *dtmcli.NewTransBase(br.Dtm, br.Info.BranchID),
|
TransBase: *dtmcli.NewTransBase(br.Info.Gid, br.Info.TransType, br.Dtm, br.Info.BranchID),
|
||||||
TransData: dtmcli.TransData{Gid: br.Info.BranchID, TransType: br.Info.TransType},
|
|
||||||
}
|
}
|
||||||
if tcc.Dtm == "" || tcc.Gid == "" {
|
if tcc.Dtm == "" || tcc.Gid == "" {
|
||||||
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, br.Info.BranchID)
|
return nil, fmt.Errorf("bad tcc info. dtm: %s, gid: %s parentID: %s", tcc.Dtm, tcc.Gid, br.Info.BranchID)
|
||||||
|
|||||||
@ -23,15 +23,13 @@ type XaGrpcClient struct {
|
|||||||
|
|
||||||
// XaGrpc xa transaction
|
// XaGrpc xa transaction
|
||||||
type XaGrpc struct {
|
type XaGrpc struct {
|
||||||
dtmcli.TransData
|
|
||||||
dtmcli.TransBase
|
dtmcli.TransBase
|
||||||
}
|
}
|
||||||
|
|
||||||
// XaGrpcFromRequest construct xa info from request
|
// XaGrpcFromRequest construct xa info from request
|
||||||
func XaGrpcFromRequest(br *BusiRequest) (*XaGrpc, error) {
|
func XaGrpcFromRequest(br *BusiRequest) (*XaGrpc, error) {
|
||||||
xa := &XaGrpc{
|
xa := &XaGrpc{
|
||||||
TransBase: *dtmcli.NewTransBase(br.Dtm, br.Info.BranchID),
|
TransBase: *dtmcli.NewTransBase(br.Info.Gid, br.Info.TransType, br.Dtm, br.Info.BranchID),
|
||||||
TransData: dtmcli.TransData{Gid: br.Info.Gid, TransType: br.Info.TransType},
|
|
||||||
}
|
}
|
||||||
if xa.Gid == "" || br.Info.BranchID == "" {
|
if xa.Gid == "" || br.Info.BranchID == "" {
|
||||||
return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, br.Info.BranchID)
|
return nil, fmt.Errorf("bad xa info: gid: %s parentid: %s", xa.Gid, br.Info.BranchID)
|
||||||
@ -111,7 +109,7 @@ func (xc *XaGrpcClient) XaLocalTransaction(br *BusiRequest, xaFunc XaGrpcLocalFu
|
|||||||
|
|
||||||
// XaGlobalTransaction start a xa global transaction
|
// XaGlobalTransaction start a xa global transaction
|
||||||
func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) (rerr error) {
|
func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc) (rerr error) {
|
||||||
xa := XaGrpc{TransBase: dtmcli.TransBase{Dtm: xc.Server}, TransData: dtmcli.TransData{Gid: gid, TransType: "xa"}}
|
xa := XaGrpc{TransBase: *dtmcli.NewTransBase(gid, "xa", xc.Server, "")}
|
||||||
dc := MustGetDtmClient(xa.Dtm)
|
dc := MustGetDtmClient(xa.Dtm)
|
||||||
req := &DtmRequest{
|
req := &DtmRequest{
|
||||||
Gid: gid,
|
Gid: gid,
|
||||||
|
|||||||
@ -31,8 +31,8 @@ func GrpcStartup() {
|
|||||||
dtmcli.FatalIfError(err)
|
dtmcli.FatalIfError(err)
|
||||||
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog))
|
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog))
|
||||||
RegisterBusiServer(s, &busiServer{})
|
RegisterBusiServer(s, &busiServer{})
|
||||||
dtmcli.Logf("busi grpc listening at %v", lis.Addr())
|
|
||||||
go func() {
|
go func() {
|
||||||
|
dtmcli.Logf("busi grpc listening at %v", lis.Addr())
|
||||||
err := s.Serve(lis)
|
err := s.Serve(lis)
|
||||||
dtmcli.FatalIfError(err)
|
dtmcli.FatalIfError(err)
|
||||||
}()
|
}()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user