grpc barrier ok

This commit is contained in:
yedf2 2021-08-12 18:00:17 +08:00
parent 47d0ba5a7b
commit 841786f618
2 changed files with 68 additions and 0 deletions

29
dtmgrpc/barrier.go Normal file
View File

@ -0,0 +1,29 @@
package dtmgrpc
import (
"database/sql"
"github.com/yedf/dtm/dtmcli"
"google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// BranchBarrier 子事务屏障
type BranchBarrier struct {
*dtmcli.BranchBarrier
}
// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
// db: 本地数据库
// transInfo: 事务信息
// bisiCall: 业务函数,仅在必要时被调用
// 返回值:
// 如果发生悬挂则busiCall不会被调用直接返回错误 ErrFailure全局事务尽早进行回滚
// 如果正常调用重复调用空补偿返回的错误值为nil正常往下进行
func (bb *BranchBarrier) Call(db *sql.DB, busiCall dtmcli.BusiFunc) (rerr error) {
err := bb.BranchBarrier.Call(db, busiCall)
if err == dtmcli.ErrFailure {
return status.New(codes.Aborted, "user rollback").Err()
}
return err
}

View File

@ -0,0 +1,39 @@
package test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc"
"github.com/yedf/dtm/examples"
)
func TestGrpcBarrierSaga(t *testing.T) {
grpcSagaBarrierNormal(t)
grpcSagaBarrierRollback(t)
}
func grpcSagaBarrierNormal(t *testing.T) {
req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30})
saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, "grpcSagaBarrierNormal").
Add(examples.BusiGrpc+"/examples.Busi/TransOutBSaga", examples.BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req).
Add(examples.BusiGrpc+"/examples.Busi/TransInBSaga", examples.BusiGrpc+"/examples.Busi/TransInRevertBSaga", req)
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
}
func grpcSagaBarrierRollback(t *testing.T) {
req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, "grpcSagaBarrierRollback").
Add(examples.BusiGrpc+"/examples.Busi/TransOutBSaga", examples.BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req).
Add(examples.BusiGrpc+"/examples.Busi/TransInBSaga", examples.BusiGrpc+"/examples.Busi/TransInRevertBSaga", req)
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
assert.Equal(t, "failed", getTransStatus(saga.Gid))
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
}