diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 5c74b26..e050a53 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -105,15 +105,6 @@ func genMsg(gid string) *dtmcli.Msg { return msg } -func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { - dtmcli.Logf("beginning a saga test ---------------- %s", gid) - saga := dtmcli.NewSaga(examples.DtmServer, gid) - req := examples.GenTransReq(30, outFailed, inFailed) - saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) - saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) - return saga -} - func transQuery(t *testing.T, gid string) { resp, err := dtmcli.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query") e2p(err) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 08f5ef1..bf20030 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -160,6 +160,7 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string { func (t *TransGlobal) getURLResult(url string, branchID, branchType string, branchData []byte) string { if t.Protocol == "grpc" { + dtmcli.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url)) server, method := dtmgrpc.GetServerAndMethod(url) conn := dtmgrpc.MustGetGrpcConn(server) err := conn.Invoke(context.Background(), method, &dtmgrpc.BusiRequest{ @@ -178,6 +179,7 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran } return err.Error() } + dtmcli.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url)) resp, err := dtmcli.RestyClient.R().SetBody(branchData). SetQueryParams(dtmcli.MS{ "gid": t.Gid, diff --git a/dtmsvr/trans_saga_test.go b/dtmsvr/trans_saga_test.go index 217abf8..1cda3a2 100644 --- a/dtmsvr/trans_saga_test.go +++ b/dtmsvr/trans_saga_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" "github.com/yedf/dtm/examples" ) @@ -45,3 +46,12 @@ func sagaRollback(t *testing.T) { assert.Equal(t, "failed", getTransStatus(saga.Gid)) assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) } + +func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga { + dtmcli.Logf("beginning a saga test ---------------- %s", gid) + saga := dtmcli.NewSaga(examples.DtmServer, gid) + req := examples.GenTransReq(30, outFailed, inFailed) + saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req) + saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req) + return saga +} diff --git a/examples/busi.pb.go b/examples/busi.pb.go index 92c347b..58d7ce6 100644 --- a/examples/busi.pb.go +++ b/examples/busi.pb.go @@ -29,7 +29,7 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x32, 0xbd, 0x02, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, + 0x6f, 0x74, 0x6f, 0x32, 0xc2, 0x03, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, @@ -49,9 +49,17 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x65, 0x72, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, - 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x22, 0x00, 0x12, 0x40, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, + 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x41, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, + 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, + 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_examples_busi_proto_goTypes = []interface{}{ @@ -64,13 +72,17 @@ var file_examples_busi_proto_depIdxs = []int32{ 0, // 2: examples.Busi.TransOut:input_type -> dtmgrpc.BusiRequest 0, // 3: examples.Busi.TransInRevert:input_type -> dtmgrpc.BusiRequest 0, // 4: examples.Busi.TransOutRevert:input_type -> dtmgrpc.BusiRequest - 1, // 5: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty - 1, // 6: examples.Busi.TransIn:output_type -> google.protobuf.Empty - 1, // 7: examples.Busi.TransOut:output_type -> google.protobuf.Empty - 1, // 8: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty - 1, // 9: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type + 0, // 5: examples.Busi.TransInConfirm:input_type -> dtmgrpc.BusiRequest + 0, // 6: examples.Busi.TransOutConfirm:input_type -> dtmgrpc.BusiRequest + 1, // 7: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty + 1, // 8: examples.Busi.TransIn:output_type -> google.protobuf.Empty + 1, // 9: examples.Busi.TransOut:output_type -> google.protobuf.Empty + 1, // 10: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty + 1, // 11: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 1, // 12: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 1, // 13: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 7, // [7:14] is the sub-list for method output_type + 0, // [0:7] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/examples/busi.proto b/examples/busi.proto index ea06edb..031d8e4 100644 --- a/examples/busi.proto +++ b/examples/busi.proto @@ -13,5 +13,7 @@ service Busi { rpc TransOut(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransInRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransOutRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransInConfirm(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransOutConfirm(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} } diff --git a/examples/busi_grpc.pb.go b/examples/busi_grpc.pb.go index 287a4cb..39e56c6 100644 --- a/examples/busi_grpc.pb.go +++ b/examples/busi_grpc.pb.go @@ -25,6 +25,8 @@ type BusiClient interface { TransOut(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -80,6 +82,24 @@ func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest return out, nil } +func (c *busiClient) TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransInConfirm", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransOutConfirm", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -89,6 +109,8 @@ type BusiServer interface { TransOut(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransInRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransInConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransOutConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -111,6 +133,12 @@ func (UnimplementedBusiServer) TransInRevert(context.Context, *dtmgrpc.BusiReque func (UnimplementedBusiServer) TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutRevert not implemented") } +func (UnimplementedBusiServer) TransInConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInConfirm not implemented") +} +func (UnimplementedBusiServer) TransOutConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutConfirm not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -214,6 +242,42 @@ func _Busi_TransOutRevert_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Busi_TransInConfirm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInConfirm(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransInConfirm", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInConfirm(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutConfirm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutConfirm(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransOutConfirm", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutConfirm(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -241,6 +305,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransOutRevert", Handler: _Busi_TransOutRevert_Handler, }, + { + MethodName: "TransInConfirm", + Handler: _Busi_TransInConfirm_Handler, + }, + { + MethodName: "TransOutConfirm", + Handler: _Busi_TransOutConfirm_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "examples/busi.proto", diff --git a/examples/main_grpc.go b/examples/main_grpc.go index 0368c56..d92dd11 100644 --- a/examples/main_grpc.go +++ b/examples/main_grpc.go @@ -62,11 +62,35 @@ func (s *busiServer) CanSubmit(ctx context.Context, in *dtmgrpc.BusiRequest) (*e func (s *busiServer) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { req := TransReq{} dtmcli.MustUnmarshal(in.AppData, &req) - return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransInResult, MainSwitch.TransInResult.Fetch(), "TransIn") + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName()) } func (s *busiServer) TransOut(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { req := TransReq{} dtmcli.MustUnmarshal(in.AppData, &req) - return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransOutResult, MainSwitch.TransOutResult.Fetch(), "TransOut") + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), req.TransOutResult, dtmcli.GetFuncName()) +} + +func (s *busiServer) TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInRevertResult.Fetch(), "", dtmcli.GetFuncName()) +} + +func (s *busiServer) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutRevertResult.Fetch(), "", dtmcli.GetFuncName()) +} + +func (s *busiServer) TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInConfirmResult.Fetch(), "", dtmcli.GetFuncName()) +} + +func (s *busiServer) TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutConfirmResult.Fetch(), "", dtmcli.GetFuncName()) }