From 20c1dca30c3100901e44bbcd2b4df1b80c51708e Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Wed, 11 Aug 2021 17:31:28 +0800 Subject: [PATCH] Tcc nested test ok --- .travis.yml | 2 +- dtmgrpc/tcc.go | 1 + dtmsvr/cron.go | 8 +++--- dtmsvr/utils_test.go | 2 +- examples/base_grpc.go | 10 +++++++ examples/busi.pb.go | 40 +++++++++++++++----------- examples/busi.proto | 1 + examples/busi_grpc.pb.go | 36 ++++++++++++++++++++++++ test/grpc_tcc_test.go | 18 ++++++++++-- test/saga_test.go | 5 +++- test/trans_xa_test.go | 61 ---------------------------------------- 11 files changed, 96 insertions(+), 88 deletions(-) delete mode 100644 test/trans_xa_test.go diff --git a/.travis.yml b/.travis.yml index 8ad0b50..a0a9192 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,4 +14,4 @@ before_install: - go get -t -v ./... - go get github.com/mattn/goveralls script: - - $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*" + - $GOPATH/bin/goveralls -service=travis-ci -ignore="examples/*,dtmgrpc/*.pb.go" diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index cb4df78..2c9c66d 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -87,6 +87,7 @@ func (t *TccGrpc) CallBranch(busiData []byte, tryURL string, confirmURL string, BranchType: "try", }, BusiData: busiData, + Dtm: t.Dtm, }, reply) return reply, err } diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 852461f..0830b4b 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -29,7 +29,7 @@ func CronExpiredTrans(num int) { for i := 0; i < num || num == -1; i++ { hasTrans := CronTransOnce(time.Duration(0)) if !hasTrans && num != 1 { - sleepCronTime() + sleepCronTime(0) } } } @@ -61,9 +61,9 @@ func handlePanic(perr *error) { } } -func sleepCronTime() { +func sleepCronTime(milli int) { delta := math.Min(3, float64(config.TransCronInterval)) interval := time.Duration((float64(config.TransCronInterval) - rand.Float64()*delta) * float64(time.Second)) - dtmcli.Logf("sleeping for %v", interval) - time.Sleep(interval) + dtmcli.Logf("sleeping for %v pass in %d milli", interval, milli) + time.Sleep(dtmcli.If(milli == 0, interval, time.Duration(milli*int(time.Millisecond))).(time.Duration)) } diff --git a/dtmsvr/utils_test.go b/dtmsvr/utils_test.go index 57491d4..09576d3 100644 --- a/dtmsvr/utils_test.go +++ b/dtmsvr/utils_test.go @@ -17,5 +17,5 @@ func TestUtils(t *testing.T) { assert.Error(t, err) CronExpiredTrans(1) - go sleepCronTime() + sleepCronTime(10) } diff --git a/examples/base_grpc.go b/examples/base_grpc.go index c1b2468..64c397f 100644 --- a/examples/base_grpc.go +++ b/examples/base_grpc.go @@ -119,3 +119,13 @@ func (s *busiServer) TransOutXa(ctx context.Context, in *dtmgrpc.BusiRequest) (* return err }) } + +func (s *busiServer) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.BusiData, &req) + tcc, err := dtmgrpc.TccFromRequest(in) + e2p(err) + _, err = tcc.CallBranch(dtmcli.MustMarshal(req), BusiGrpc+"/examples.Busi/TransIn", BusiGrpc+"/examples.Busi/TransInConfirm", BusiGrpc+"/examples.Busi/TransInRevert") + e2p(err) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName()) +} diff --git a/examples/busi.pb.go b/examples/busi.pb.go index 9cc7bfa..67df4a6 100644 --- a/examples/busi.pb.go +++ b/examples/busi.pb.go @@ -29,7 +29,7 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x32, 0xf9, 0x04, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, + 0x6f, 0x74, 0x6f, 0x32, 0xbd, 0x05, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, @@ -68,10 +68,14 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x00, 0x12, 0x3c, 0x0a, 0x0a, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x58, 0x61, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, - 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, - 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, + 0x42, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, + 0x74, 0x65, 0x64, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, + 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_examples_busi_proto_goTypes = []interface{}{ @@ -89,18 +93,20 @@ var file_examples_busi_proto_depIdxs = []int32{ 0, // 7: examples.Busi.XaNotify:input_type -> dtmgrpc.BusiRequest 0, // 8: examples.Busi.TransInXa:input_type -> dtmgrpc.BusiRequest 0, // 9: examples.Busi.TransOutXa:input_type -> dtmgrpc.BusiRequest - 1, // 10: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty - 1, // 11: examples.Busi.TransIn:output_type -> google.protobuf.Empty - 1, // 12: examples.Busi.TransOut:output_type -> google.protobuf.Empty - 1, // 13: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty - 1, // 14: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 1, // 15: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty - 1, // 16: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty - 1, // 17: examples.Busi.XaNotify:output_type -> google.protobuf.Empty - 1, // 18: examples.Busi.TransInXa:output_type -> google.protobuf.Empty - 1, // 19: examples.Busi.TransOutXa:output_type -> google.protobuf.Empty - 10, // [10:20] is the sub-list for method output_type - 0, // [0:10] is the sub-list for method input_type + 0, // 10: examples.Busi.TransInTccNested:input_type -> dtmgrpc.BusiRequest + 1, // 11: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty + 1, // 12: examples.Busi.TransIn:output_type -> google.protobuf.Empty + 1, // 13: examples.Busi.TransOut:output_type -> google.protobuf.Empty + 1, // 14: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty + 1, // 15: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 1, // 16: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 1, // 17: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 1, // 18: examples.Busi.XaNotify:output_type -> google.protobuf.Empty + 1, // 19: examples.Busi.TransInXa:output_type -> google.protobuf.Empty + 1, // 20: examples.Busi.TransOutXa:output_type -> google.protobuf.Empty + 1, // 21: examples.Busi.TransInTccNested:output_type -> google.protobuf.Empty + 11, // [11:22] is the sub-list for method output_type + 0, // [0:11] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/examples/busi.proto b/examples/busi.proto index deb449a..23a9ee5 100644 --- a/examples/busi.proto +++ b/examples/busi.proto @@ -18,5 +18,6 @@ service Busi { rpc XaNotify(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransInXa(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransOutXa(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransInTccNested(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} } diff --git a/examples/busi_grpc.pb.go b/examples/busi_grpc.pb.go index fecfae3..2314b12 100644 --- a/examples/busi_grpc.pb.go +++ b/examples/busi_grpc.pb.go @@ -30,6 +30,7 @@ type BusiClient interface { XaNotify(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransInXa(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutXa(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInTccNested(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -130,6 +131,15 @@ func (c *busiClient) TransOutXa(ctx context.Context, in *dtmgrpc.BusiRequest, op return out, nil } +func (c *busiClient) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransInTccNested", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -144,6 +154,7 @@ type BusiServer interface { XaNotify(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransInXa(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransOutXa(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -181,6 +192,9 @@ func (UnimplementedBusiServer) TransInXa(context.Context, *dtmgrpc.BusiRequest) func (UnimplementedBusiServer) TransOutXa(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutXa not implemented") } +func (UnimplementedBusiServer) TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInTccNested not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -374,6 +388,24 @@ func _Busi_TransOutXa_Handler(srv interface{}, ctx context.Context, dec func(int return interceptor(ctx, in, info, handler) } +func _Busi_TransInTccNested_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInTccNested(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransInTccNested", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInTccNested(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -421,6 +453,10 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransOutXa", Handler: _Busi_TransOutXa_Handler, }, + { + MethodName: "TransInTccNested", + Handler: _Busi_TransInTccNested_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "examples/busi.proto", diff --git a/test/grpc_tcc_test.go b/test/grpc_tcc_test.go index 3671838..f246816 100644 --- a/test/grpc_tcc_test.go +++ b/test/grpc_tcc_test.go @@ -11,9 +11,9 @@ import ( ) func TestGrpcTcc(t *testing.T) { - tccGrpcNormal(t) - tccGrpcRollback(t) - + // tccGrpcNormal(t) + tccGrpcNested(t) + // tccGrpcRollback(t) } func tccGrpcNormal(t *testing.T) { @@ -28,6 +28,18 @@ func tccGrpcNormal(t *testing.T) { assert.Nil(t, err) } +func tccGrpcNested(t *testing.T) { + data := dtmcli.MustMarshal(&examples.TransReq{Amount: 30}) + gid := "tccGrpcNested" + err := dtmgrpc.TccGlobalTransaction(examples.DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error { + _, err := tcc.CallBranch(data, examples.BusiGrpc+"/examples.Busi/TransOut", examples.BusiGrpc+"/examples.Busi/TransOutConfirm", examples.BusiGrpc+"/examples.Busi/TransOutRevert") + assert.Nil(t, err) + _, err = tcc.CallBranch(data, examples.BusiGrpc+"/examples.Busi/TransInTccNested", examples.BusiGrpc+"/examples.Busi/TransInConfirm", examples.BusiGrpc+"/examples.Busi/TransInRevert") + return err + }) + assert.Nil(t, err) +} + func tccGrpcRollback(t *testing.T) { gid := "tccGrpcRollback" data := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}) diff --git a/test/saga_test.go b/test/saga_test.go index e99888b..6c74b18 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -39,12 +39,15 @@ func sagaCommittedPending(t *testing.T) { func sagaRollback(t *testing.T) { saga := genSaga("gid-rollbackSaga2", false, true) examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") - saga.Submit() + err := saga.Submit() + assert.Nil(t, err) WaitTransProcessed(saga.Gid) assert.Equal(t, "aborting", getTransStatus(saga.Gid)) CronTransOnce(60 * time.Second) assert.Equal(t, "failed", getTransStatus(saga.Gid)) assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) + err = saga.Submit() + assert.Error(t, err) } func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { diff --git a/test/trans_xa_test.go b/test/trans_xa_test.go deleted file mode 100644 index 35ac8ee..0000000 --- a/test/trans_xa_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package test - -import ( - "fmt" - "testing" - - "github.com/go-resty/resty/v2" - "github.com/stretchr/testify/assert" - "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/examples" -) - -func TestXa(t *testing.T) { - if config.DB["driver"] != "mysql" { - return - } - xaLocalError(t) - xaNormal(t) - xaRollback(t) -} - -func xaLocalError(t *testing.T) { - xc := examples.XaClient - err := xc.XaGlobalTransaction("xaLocalError", func(xa *dtmcli.Xa) (*resty.Response, error) { - return nil, fmt.Errorf("an error") - }) - assert.Error(t, err, fmt.Errorf("an error")) -} - -func xaNormal(t *testing.T) { - xc := examples.XaClient - gid := "xaNormal" - err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := examples.GenTransReq(30, false, false) - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - if err != nil { - return resp, err - } - return xa.CallBranch(req, examples.Busi+"/TransInXa") - }) - assert.Equal(t, nil, err) - WaitTransProcessed(gid) - assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid)) -} - -func xaRollback(t *testing.T) { - xc := examples.XaClient - gid := "xaRollback" - err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { - req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"} - resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa") - if err != nil { - return resp, err - } - return xa.CallBranch(req, examples.Busi+"/TransInXa") - }) - assert.Error(t, err) - WaitTransProcessed(gid) - assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid)) - assert.Equal(t, "failed", getTransStatus(gid)) -}