grpc tcc test ok
This commit is contained in:
parent
f9f5f501cc
commit
70aac1659c
@ -36,7 +36,8 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
x := recover()
|
x := recover()
|
||||||
if x == nil && rerr == nil {
|
if x == nil && rerr == nil {
|
||||||
_, rerr = dc.Submit(context.Background(), dr)
|
_, rerr = dc.Submit(context.Background(), dr)
|
||||||
} else {
|
return
|
||||||
|
}
|
||||||
_, err := dc.Abort(context.Background(), dr)
|
_, err := dc.Abort(context.Background(), dr)
|
||||||
if rerr == nil {
|
if rerr == nil {
|
||||||
rerr = err
|
rerr = err
|
||||||
@ -44,8 +45,6 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e
|
|||||||
if x != nil {
|
if x != nil {
|
||||||
panic(x)
|
panic(x)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
return tccFunc(tcc)
|
return tccFunc(tcc)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/yedf/dtm/dtmcli"
|
"github.com/yedf/dtm/dtmcli"
|
||||||
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
|
|
||||||
func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) {
|
func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) {
|
||||||
@ -23,3 +24,33 @@ func svcPrepare(t *TransGlobal) (interface{}, error) {
|
|||||||
t.saveNew(dbGet())
|
t.saveNew(dbGet())
|
||||||
return dtmcli.ResultSuccess, nil
|
return dtmcli.ResultSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func svcAbort(t *TransGlobal, waitResult bool) (interface{}, error) {
|
||||||
|
db := dbGet()
|
||||||
|
dbt := TransFromDb(db, t.Gid)
|
||||||
|
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" {
|
||||||
|
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
|
||||||
|
}
|
||||||
|
return dbt.Process(db, waitResult), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func svcRegisterTccBranch(branch *TransBranch, data dtmcli.MS) (interface{}, error) {
|
||||||
|
db := dbGet()
|
||||||
|
dbt := TransFromDb(db, branch.Gid)
|
||||||
|
if dbt.Status != "prepared" {
|
||||||
|
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
branches := []TransBranch{*branch, *branch, *branch}
|
||||||
|
for i, b := range []string{"cancel", "confirm", "try"} {
|
||||||
|
branches[i].BranchType = b
|
||||||
|
branches[i].URL = data[b]
|
||||||
|
}
|
||||||
|
|
||||||
|
db.Must().Clauses(clause.OnConflict{
|
||||||
|
DoNothing: true,
|
||||||
|
}).Create(branches)
|
||||||
|
global := TransGlobal{Gid: branch.Gid}
|
||||||
|
global.touch(dbGet(), config.TransCronInterval)
|
||||||
|
return dtmcli.ResultSuccess, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package dtmsvr
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/yedf/dtm/dtmcli"
|
||||||
"github.com/yedf/dtm/dtmgrpc"
|
"github.com/yedf/dtm/dtmgrpc"
|
||||||
pb "github.com/yedf/dtm/dtmgrpc"
|
pb "github.com/yedf/dtm/dtmgrpc"
|
||||||
"google.golang.org/protobuf/types/known/emptypb"
|
"google.golang.org/protobuf/types/known/emptypb"
|
||||||
@ -22,3 +23,22 @@ func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Em
|
|||||||
r, err := svcPrepare(TransFromDtmRequest(in))
|
r, err := svcPrepare(TransFromDtmRequest(in))
|
||||||
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
|
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
|
||||||
|
r, err := svcAbort(TransFromDtmRequest(in), in.WaitResult)
|
||||||
|
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *dtmServer) RegisterTccBranch(ctx context.Context, in *pb.DtmTccBranchRequest) (*emptypb.Empty, error) {
|
||||||
|
r, err := svcRegisterTccBranch(&TransBranch{
|
||||||
|
Gid: in.Info.Gid,
|
||||||
|
BranchID: in.Info.BranchID,
|
||||||
|
Status: "prepared",
|
||||||
|
Data: in.BusiData,
|
||||||
|
}, dtmcli.MS{
|
||||||
|
"cancel": in.Cancel,
|
||||||
|
"confirm": in.Confirm,
|
||||||
|
"try": in.Try,
|
||||||
|
})
|
||||||
|
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
|
||||||
|
}
|
||||||
|
|||||||
@ -37,13 +37,7 @@ func submit(c *gin.Context) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func abort(c *gin.Context) (interface{}, error) {
|
func abort(c *gin.Context) (interface{}, error) {
|
||||||
db := dbGet()
|
return svcAbort(TransFromContext(c), c.Query("wait_result") == "1")
|
||||||
t := TransFromContext(c)
|
|
||||||
dbt := TransFromDb(db, t.Gid)
|
|
||||||
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" {
|
|
||||||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
|
|
||||||
}
|
|
||||||
return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerXaBranch(c *gin.Context) (interface{}, error) {
|
func registerXaBranch(c *gin.Context) (interface{}, error) {
|
||||||
@ -78,25 +72,7 @@ func registerTccBranch(c *gin.Context) (interface{}, error) {
|
|||||||
Status: "prepared",
|
Status: "prepared",
|
||||||
Data: data["data"],
|
Data: data["data"],
|
||||||
}
|
}
|
||||||
db := dbGet()
|
return svcRegisterTccBranch(&branch, data)
|
||||||
dbt := TransFromDb(db, branch.Gid)
|
|
||||||
if dbt.Status != "prepared" {
|
|
||||||
return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
branches := []TransBranch{branch, branch, branch}
|
|
||||||
for i, b := range []string{"cancel", "confirm", "try"} {
|
|
||||||
branches[i].BranchType = b
|
|
||||||
branches[i].URL = data[b]
|
|
||||||
}
|
|
||||||
|
|
||||||
db.Must().Clauses(clause.OnConflict{
|
|
||||||
DoNothing: true,
|
|
||||||
}).Create(branches)
|
|
||||||
e2p(err)
|
|
||||||
global := TransGlobal{Gid: branch.Gid}
|
|
||||||
global.touch(dbGet(), config.TransCronInterval)
|
|
||||||
return dtmcli.ResultSuccess, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func query(c *gin.Context) (interface{}, error) {
|
func query(c *gin.Context) (interface{}, error) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user