Merge pull request #25 from yedf/alpha

xa add duplicate example and refactored
This commit is contained in:
yedf2 2021-09-03 19:50:52 +08:00 committed by GitHub
commit c56b3811f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 134 deletions

View File

@ -37,6 +37,11 @@ func (g *IDGenerator) NewBranchID() string {
panic(fmt.Errorf("total branch id is longer than 20")) panic(fmt.Errorf("total branch id is longer than 20"))
} }
g.branchID = g.branchID + 1 g.branchID = g.branchID + 1
return g.CurrentBranchID()
}
// CurrentBranchID return current branchID
func (g *IDGenerator) CurrentBranchID() string {
return g.parentID + fmt.Sprintf("%02d", g.branchID) return g.parentID + fmt.Sprintf("%02d", g.branchID)
} }

View File

@ -19,9 +19,7 @@ type XaRegisterCallback func(path string, xa *XaClient)
// XaClient xa client // XaClient xa client
type XaClient struct { type XaClient struct {
Server string XaClientBase
Conf map[string]string
NotifyURL string
} }
// Xa xa transaction // Xa xa transaction
@ -40,11 +38,11 @@ func XaFromQuery(qs url.Values) (*Xa, error) {
// NewXaClient construct a xa client // NewXaClient construct a xa client
func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, register XaRegisterCallback) (*XaClient, error) { func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, register XaRegisterCallback) (*XaClient, error) {
xa := &XaClient{ xa := &XaClient{XaClientBase: XaClientBase{
Server: server, Server: server,
Conf: mysqlConf, Conf: mysqlConf,
NotifyURL: notifyURL, NotifyURL: notifyURL,
} }}
u, err := url.Parse(notifyURL) u, err := url.Parse(notifyURL)
if err != nil { if err != nil {
return nil, err return nil, err
@ -55,15 +53,7 @@ func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, r
// HandleCallback 处理commit/rollback的回调 // HandleCallback 处理commit/rollback的回调
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) { func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) {
db, err := SdbAlone(xc.Conf) return ResultSuccess, xc.XaClientBase.HandleCallback(gid, branchID, action)
if err != nil {
return nil, err
}
defer db.Close()
xaID := gid + "-" + branchID
_, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
return ResultSuccess, err
} }
// XaLocalTransaction start a xa local transaction // XaLocalTransaction start a xa local transaction
@ -72,62 +62,27 @@ func (xc *XaClient) XaLocalTransaction(qs url.Values, xaFunc XaLocalFunc) (ret i
if rerr != nil { if rerr != nil {
return return
} }
xa.Dtm = xc.Server ret, rerr = xc.HandleLocalTrans(&xa.TransBase, func(db *sql.DB) (ret interface{}, rerr error) {
branchID := xa.NewBranchID() ret, rerr = xaFunc(db, xa)
xaBranch := xa.Gid + "-" + branchID rerr = CheckResult(ret, rerr)
db, rerr := SdbAlone(xc.Conf) if rerr != nil {
if rerr != nil { return
return
}
defer func() { db.Close() }()
defer func() {
x := recover()
_, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
if x == nil && rerr == nil && err == nil {
_, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
} }
if rerr == nil { rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": xa.CurrentBranchID(), "trans_type": "xa", "url": xc.XaClientBase.NotifyURL}, "registerXaBranch")
rerr = err
}
if x != nil {
panic(x)
}
}()
_, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch))
if rerr != nil {
return return
} })
ret, rerr = xaFunc(db, xa)
rerr = CheckResult(ret, rerr)
if rerr != nil {
return
}
rerr = xa.callDtm(&M{"gid": xa.Gid, "branch_id": branchID, "trans_type": "xa", "url": xc.NotifyURL}, "registerXaBranch")
return return
} }
// XaGlobalTransaction start a xa global transaction // XaGlobalTransaction start a xa global transaction
func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) { func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr error) {
xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.Server, "")} xa := Xa{TransBase: *NewTransBase(gid, "xa", xc.XaClientBase.Server, "")}
rerr = xa.callDtm(xa, "prepare") return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error {
if rerr != nil { return xa.callDtm(xa, action)
return }, func() error {
} resp, rerr := xaFunc(&xa)
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题 return CheckResponse(resp, rerr)
defer func() { })
x := recover()
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
err := xa.callDtm(xa, operation)
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err
}
if x != nil {
panic(x)
}
}()
resp, rerr := xaFunc(&xa)
rerr = CheckResponse(resp, rerr)
return
} }
// CallBranch call a xa branch // CallBranch call a xa branch

81
dtmcli/xa_base.go Normal file
View File

@ -0,0 +1,81 @@
package dtmcli
import (
"database/sql"
"fmt"
"strings"
)
// XaClientBase XaClient/XaGrpcClient base
type XaClientBase struct {
Server string
Conf map[string]string
NotifyURL string
}
// HandleCallback 处理commit/rollback的回调
func (xc *XaClientBase) HandleCallback(gid string, branchID string, action string) error {
db, err := SdbAlone(xc.Conf)
if err != nil {
return err
}
defer db.Close()
xaID := gid + "-" + branchID
_, err = DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
if err != nil && strings.Contains(err.Error(), "Error 1397: XAER_NOTA") { // 重复commit/rollback同一个id报这个错误忽略
err = nil
}
return err
}
// HandleLocalTrans http/grpc 处理LocalTransaction的公共方法
func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) (interface{}, error)) (ret interface{}, rerr error) {
branchID := xa.NewBranchID()
xaBranch := xa.Gid + "-" + branchID
db, rerr := SdbAlone(xc.Conf)
if rerr != nil {
return
}
defer func() { db.Close() }()
defer func() {
x := recover()
_, err := DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
if x == nil && rerr == nil && err == nil {
_, err = DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
}
if rerr == nil {
rerr = err
}
if x != nil {
panic(x)
}
}()
_, rerr = DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch))
if rerr != nil {
return
}
ret, rerr = cb(db)
return
}
// HandleGlobalTrans http/grpc GlobalTransaction的公共方法
func (xc *XaClientBase) HandleGlobalTrans(xa *TransBase, callDtm func(string) error, callBusi func() error) (rerr error) {
rerr = callDtm("prepare")
if rerr != nil {
return
}
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题
defer func() {
x := recover()
operation := If(x != nil || rerr != nil, "abort", "submit").(string)
err := callDtm(operation)
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err
}
if x != nil {
panic(x)
}
}()
rerr = callBusi()
return
}

View File

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
grpc "google.golang.org/grpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
) )
// XaGrpcGlobalFunc type of xa global function // XaGrpcGlobalFunc type of xa global function
@ -16,9 +18,7 @@ type XaGrpcLocalFunc func(db *sql.DB, xa *XaGrpc) error
// XaGrpcClient xa client // XaGrpcClient xa client
type XaGrpcClient struct { type XaGrpcClient struct {
Server string dtmcli.XaClientBase
Conf map[string]string
NotifyURL string
} }
// XaGrpc xa transaction // XaGrpc xa transaction
@ -39,25 +39,17 @@ func XaGrpcFromRequest(br *BusiRequest) (*XaGrpc, error) {
// NewXaGrpcClient construct a xa client // NewXaGrpcClient construct a xa client
func NewXaGrpcClient(server string, mysqlConf map[string]string, notifyURL string) *XaGrpcClient { func NewXaGrpcClient(server string, mysqlConf map[string]string, notifyURL string) *XaGrpcClient {
xa := &XaGrpcClient{ xa := &XaGrpcClient{XaClientBase: dtmcli.XaClientBase{
Server: server, Server: server,
Conf: mysqlConf, Conf: mysqlConf,
NotifyURL: notifyURL, NotifyURL: notifyURL,
} }}
return xa return xa
} }
// HandleCallback 处理commit/rollback的回调 // HandleCallback 处理commit/rollback的回调
func (xc *XaGrpcClient) HandleCallback(gid string, branchID string, action string) error { func (xc *XaGrpcClient) HandleCallback(gid string, branchID string, action string) error {
db, err := dtmcli.SdbAlone(xc.Conf) return xc.XaClientBase.HandleCallback(gid, branchID, action)
if err != nil {
return err
}
defer db.Close()
xaID := gid + "-" + branchID
_, err = dtmcli.DBExec(db, fmt.Sprintf("xa %s '%s'", action, xaID))
return err
} }
// XaLocalTransaction start a xa local transaction // XaLocalTransaction start a xa local transaction
@ -66,43 +58,21 @@ func (xc *XaGrpcClient) XaLocalTransaction(br *BusiRequest, xaFunc XaGrpcLocalFu
if rerr != nil { if rerr != nil {
return return
} }
xa.Dtm = xc.Server _, rerr = xc.HandleLocalTrans(&xa.TransBase, func(db *sql.DB) (interface{}, error) {
branchID := xa.NewBranchID() rerr := xaFunc(db, xa)
xaBranch := xa.Gid + "-" + branchID if rerr != nil {
db, rerr := dtmcli.SdbAlone(xc.Conf) return nil, rerr
if rerr != nil {
return
}
defer func() { db.Close() }()
defer func() {
x := recover()
_, err := dtmcli.DBExec(db, fmt.Sprintf("XA end '%s'", xaBranch))
if x == nil && rerr == nil && err == nil {
_, err = dtmcli.DBExec(db, fmt.Sprintf("XA prepare '%s'", xaBranch))
} }
if rerr == nil { _, rerr = MustGetDtmClient(xa.Dtm).RegisterXaBranch(context.Background(), &DtmXaBranchRequest{
rerr = err Info: &BranchInfo{
} Gid: xa.Gid,
if x != nil { BranchID: xa.CurrentBranchID(),
panic(x) TransType: xa.TransType,
} },
}() BusiData: "",
_, rerr = dtmcli.DBExec(db, fmt.Sprintf("XA start '%s'", xaBranch)) Notify: xc.NotifyURL,
if rerr != nil { })
return return nil, rerr
}
rerr = xaFunc(db, xa)
if rerr != nil {
return
}
_, rerr = MustGetDtmClient(xa.Dtm).RegisterXaBranch(context.Background(), &DtmXaBranchRequest{
Info: &BranchInfo{
Gid: xa.Gid,
BranchID: branchID,
TransType: xa.TransType,
},
BusiData: "",
Notify: xc.NotifyURL,
}) })
return return
} }
@ -115,27 +85,17 @@ func (xc *XaGrpcClient) XaGlobalTransaction(gid string, xaFunc XaGrpcGlobalFunc)
Gid: gid, Gid: gid,
TransType: xa.TransType, TransType: xa.TransType,
} }
_, rerr = dc.Prepare(context.Background(), req) return xc.HandleGlobalTrans(&xa.TransBase, func(action string) error {
if rerr != nil { f := map[string]func(context.Context, *DtmRequest, ...grpc.CallOption) (*emptypb.Empty, error){
return "prepare": dc.Prepare,
} "submit": dc.Submit,
// 小概率情况下prepare成功了但是由于网络状况导致上面Failure那么不执行下面defer的内容等待超时后再回滚标记事务失败也没有问题 "abort": dc.Abort,
defer func() { }[action]
x := recover() _, err := f(context.Background(), req)
if x == nil && rerr == nil { return err
_, rerr = dc.Submit(context.Background(), req) }, func() error {
return return xaFunc(&xa)
} })
_, err := dc.Abort(context.Background(), req)
if rerr == nil { // 如果用户函数没有返回错误那么返回dtm的
rerr = err
}
if x != nil {
panic(x)
}
}()
rerr = xaFunc(&xa)
return
} }
// CallBranch call a xa branch // CallBranch call a xa branch

View File

@ -6,6 +6,7 @@ import (
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples" "github.com/yedf/dtm/examples"
) )
@ -16,6 +17,7 @@ func TestXa(t *testing.T) {
} }
xaLocalError(t) xaLocalError(t)
xaNormal(t) xaNormal(t)
xaDuplicate(t)
xaRollback(t) xaRollback(t)
} }
@ -43,6 +45,23 @@ func xaNormal(t *testing.T) {
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
} }
func xaDuplicate(t *testing.T) {
xc := examples.XaClient
gid := "xaDuplicate"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
req := examples.GenTransReq(30, false, false)
_, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
assert.Nil(t, err)
sdb, err := dtmcli.SdbAlone(common.DtmConfig.DB)
assert.Nil(t, err)
dtmcli.DBExec(sdb, "xa recover")
dtmcli.DBExec(sdb, "xa commit 'xaDuplicate-0101'") // 先把某一个事务提交,模拟重复请求
return xa.CallBranch(req, examples.Busi+"/TransInXa")
})
assert.Equal(t, nil, err)
WaitTransProcessed(gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
}
func xaRollback(t *testing.T) { func xaRollback(t *testing.T) {
xc := examples.XaClient xc := examples.XaClient
gid := "xaRollback" gid := "xaRollback"