diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index f67c54c..1a28a55 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -36,15 +36,14 @@ func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr e x := recover() if x == nil && rerr == nil { _, rerr = dc.Submit(context.Background(), dr) - } else { - _, err := dc.Abort(context.Background(), dr) - if rerr == nil { - rerr = err - } - if x != nil { - panic(x) - } - + return + } + _, err := dc.Abort(context.Background(), dr) + if rerr == nil { + rerr = err + } + if x != nil { + panic(x) } }() return tccFunc(tcc) diff --git a/dtmsvr/api.go b/dtmsvr/api.go index d95308f..23d9c15 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/yedf/dtm/dtmcli" + "gorm.io/gorm/clause" ) func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) { @@ -23,3 +24,33 @@ func svcPrepare(t *TransGlobal) (interface{}, error) { t.saveNew(dbGet()) return dtmcli.ResultSuccess, nil } + +func svcAbort(t *TransGlobal, waitResult bool) (interface{}, error) { + db := dbGet() + dbt := TransFromDb(db, t.Gid) + if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { + return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil + } + return dbt.Process(db, waitResult), nil +} + +func svcRegisterTccBranch(branch *TransBranch, data dtmcli.MS) (interface{}, error) { + db := dbGet() + dbt := TransFromDb(db, branch.Gid) + if dbt.Status != "prepared" { + return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil + } + + branches := []TransBranch{*branch, *branch, *branch} + for i, b := range []string{"cancel", "confirm", "try"} { + branches[i].BranchType = b + branches[i].URL = data[b] + } + + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(branches) + global := TransGlobal{Gid: branch.Gid} + global.touch(dbGet(), config.TransCronInterval) + return dtmcli.ResultSuccess, nil +} diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index ac09687..7920db7 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -3,6 +3,7 @@ package dtmsvr import ( "context" + "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/dtmgrpc" pb "github.com/yedf/dtm/dtmgrpc" "google.golang.org/protobuf/types/known/emptypb" @@ -22,3 +23,22 @@ func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Em r, err := svcPrepare(TransFromDtmRequest(in)) return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) } + +func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { + r, err := svcAbort(TransFromDtmRequest(in), in.WaitResult) + return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) +} + +func (s *dtmServer) RegisterTccBranch(ctx context.Context, in *pb.DtmTccBranchRequest) (*emptypb.Empty, error) { + r, err := svcRegisterTccBranch(&TransBranch{ + Gid: in.Info.Gid, + BranchID: in.Info.BranchID, + Status: "prepared", + Data: in.BusiData, + }, dtmcli.MS{ + "cancel": in.Cancel, + "confirm": in.Confirm, + "try": in.Try, + }) + return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) +} diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index edbff98..884d76f 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -37,13 +37,7 @@ func submit(c *gin.Context) (interface{}, error) { } func abort(c *gin.Context) (interface{}, error) { - db := dbGet() - t := TransFromContext(c) - dbt := TransFromDb(db, t.Gid) - if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { - return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil - } - return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil + return svcAbort(TransFromContext(c), c.Query("wait_result") == "1") } func registerXaBranch(c *gin.Context) (interface{}, error) { @@ -78,25 +72,7 @@ func registerTccBranch(c *gin.Context) (interface{}, error) { Status: "prepared", Data: data["data"], } - db := dbGet() - dbt := TransFromDb(db, branch.Gid) - if dbt.Status != "prepared" { - return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil - } - - branches := []TransBranch{branch, branch, branch} - for i, b := range []string{"cancel", "confirm", "try"} { - branches[i].BranchType = b - branches[i].URL = data[b] - } - - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(branches) - e2p(err) - global := TransGlobal{Gid: branch.Gid} - global.touch(dbGet(), config.TransCronInterval) - return dtmcli.ResultSuccess, nil + return svcRegisterTccBranch(&branch, data) } func query(c *gin.Context) (interface{}, error) {