From 841786f61892d0ac2141fd22086875d8ea9eb70b Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 12 Aug 2021 18:00:17 +0800 Subject: [PATCH] grpc barrier ok --- dtmgrpc/barrier.go | 29 +++++++++++++++++++++++++ test/grpc_barrier_saga_test.go | 39 ++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 dtmgrpc/barrier.go create mode 100644 test/grpc_barrier_saga_test.go diff --git a/dtmgrpc/barrier.go b/dtmgrpc/barrier.go new file mode 100644 index 0000000..468407a --- /dev/null +++ b/dtmgrpc/barrier.go @@ -0,0 +1,29 @@ +package dtmgrpc + +import ( + "database/sql" + + "github.com/yedf/dtm/dtmcli" + "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// BranchBarrier 子事务屏障 +type BranchBarrier struct { + *dtmcli.BranchBarrier +} + +// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465 +// db: 本地数据库 +// transInfo: 事务信息 +// bisiCall: 业务函数,仅在必要时被调用 +// 返回值: +// 如果发生悬挂,则busiCall不会被调用,直接返回错误 ErrFailure,全局事务尽早进行回滚 +// 如果正常调用,重复调用,空补偿,返回的错误值为nil,正常往下进行 +func (bb *BranchBarrier) Call(db *sql.DB, busiCall dtmcli.BusiFunc) (rerr error) { + err := bb.BranchBarrier.Call(db, busiCall) + if err == dtmcli.ErrFailure { + return status.New(codes.Aborted, "user rollback").Err() + } + return err +} diff --git a/test/grpc_barrier_saga_test.go b/test/grpc_barrier_saga_test.go new file mode 100644 index 0000000..a37a759 --- /dev/null +++ b/test/grpc_barrier_saga_test.go @@ -0,0 +1,39 @@ +package test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmgrpc" + "github.com/yedf/dtm/examples" +) + +func TestGrpcBarrierSaga(t *testing.T) { + + grpcSagaBarrierNormal(t) + grpcSagaBarrierRollback(t) +} + +func grpcSagaBarrierNormal(t *testing.T) { + req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30}) + saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, "grpcSagaBarrierNormal"). + Add(examples.BusiGrpc+"/examples.Busi/TransOutBSaga", examples.BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req). + Add(examples.BusiGrpc+"/examples.Busi/TransInBSaga", examples.BusiGrpc+"/examples.Busi/TransInRevertBSaga", req) + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) +} + +func grpcSagaBarrierRollback(t *testing.T) { + req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) + saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, "grpcSagaBarrierRollback"). + Add(examples.BusiGrpc+"/examples.Busi/TransOutBSaga", examples.BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req). + Add(examples.BusiGrpc+"/examples.Busi/TransInBSaga", examples.BusiGrpc+"/examples.Busi/TransInRevertBSaga", req) + err := saga.Submit() + e2p(err) + WaitTransProcessed(saga.Gid) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) + assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) +}