diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index 223dc51..a4eae9c 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -26,11 +26,16 @@ func (bb *BranchBarrier) String() string { // BarrierFromQuery construct transaction info from request func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) { + return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("branch_type")) +} + +// BarrierFrom construct transaction info from request +func BarrierFrom(transType, gid, branchID, branchType string) (*BranchBarrier, error) { ti := &BranchBarrier{ - TransType: qs.Get("trans_type"), - Gid: qs.Get("gid"), - BranchID: qs.Get("branch_id"), - BranchType: qs.Get("branch_type"), + TransType: transType, + Gid: gid, + BranchID: branchID, + BranchType: branchType, } if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" { return nil, fmt.Errorf("invlid trans info: %v", ti) diff --git a/examples/base_types.go b/examples/base_types.go index 8e76ccb..3c41b7e 100644 --- a/examples/base_types.go +++ b/examples/base_types.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmgrpc" ) // M alias @@ -70,9 +71,16 @@ func sdbGet() *sql.DB { return db } -// MustGetTrans construct transaction info from request -func MustGetTrans(c *gin.Context) *dtmcli.BranchBarrier { +// MustBarrierFromGin 1 +func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier { ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query()) dtmcli.FatalIfError(err) return ti } + +// MustBarrierFromGrpc 1 +func MustBarrierFromGrpc(in *dtmgrpc.BusiRequest) *dtmcli.BranchBarrier { + ti, err := dtmcli.BarrierFrom(in.Info.TransType, in.Info.Gid, in.Info.BranchID, in.Info.BranchType) + dtmcli.FatalIfError(err) + return ti +} diff --git a/examples/busi.pb.go b/examples/busi.pb.go index ccd34b0..f69f60a 100644 --- a/examples/busi.pb.go +++ b/examples/busi.pb.go @@ -29,7 +29,7 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x32, 0xa6, 0x06, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, + 0x6f, 0x74, 0x6f, 0x32, 0xb4, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, @@ -79,10 +79,27 @@ var file_examples_busi_proto_rawDesc = []byte{ 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, - 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, - 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x0c, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64, + 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x0d, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, + 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x44, 0x0a, + 0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, + 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, + 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, + 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, + 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var file_examples_busi_proto_goTypes = []interface{}{ @@ -104,21 +121,29 @@ var file_examples_busi_proto_depIdxs = []int32{ 0, // 10: examples.Busi.TransInTcc:input_type -> dtmgrpc.BusiRequest 0, // 11: examples.Busi.TransOutTcc:input_type -> dtmgrpc.BusiRequest 0, // 12: examples.Busi.TransInTccNested:input_type -> dtmgrpc.BusiRequest - 1, // 13: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty - 1, // 14: examples.Busi.TransIn:output_type -> google.protobuf.Empty - 1, // 15: examples.Busi.TransOut:output_type -> google.protobuf.Empty - 1, // 16: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty - 1, // 17: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 1, // 18: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty - 1, // 19: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty - 1, // 20: examples.Busi.XaNotify:output_type -> google.protobuf.Empty - 2, // 21: examples.Busi.TransInXa:output_type -> dtmgrpc.BusiReply - 2, // 22: examples.Busi.TransOutXa:output_type -> dtmgrpc.BusiReply - 2, // 23: examples.Busi.TransInTcc:output_type -> dtmgrpc.BusiReply - 2, // 24: examples.Busi.TransOutTcc:output_type -> dtmgrpc.BusiReply - 2, // 25: examples.Busi.TransInTccNested:output_type -> dtmgrpc.BusiReply - 13, // [13:26] is the sub-list for method output_type - 0, // [0:13] is the sub-list for method input_type + 0, // 13: examples.Busi.TransInBSaga:input_type -> dtmgrpc.BusiRequest + 0, // 14: examples.Busi.TransOutBSaga:input_type -> dtmgrpc.BusiRequest + 0, // 15: examples.Busi.TransInRevertBSaga:input_type -> dtmgrpc.BusiRequest + 0, // 16: examples.Busi.TransOutRevertBSaga:input_type -> dtmgrpc.BusiRequest + 1, // 17: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty + 1, // 18: examples.Busi.TransIn:output_type -> google.protobuf.Empty + 1, // 19: examples.Busi.TransOut:output_type -> google.protobuf.Empty + 1, // 20: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty + 1, // 21: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 1, // 22: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 1, // 23: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 1, // 24: examples.Busi.XaNotify:output_type -> google.protobuf.Empty + 2, // 25: examples.Busi.TransInXa:output_type -> dtmgrpc.BusiReply + 2, // 26: examples.Busi.TransOutXa:output_type -> dtmgrpc.BusiReply + 2, // 27: examples.Busi.TransInTcc:output_type -> dtmgrpc.BusiReply + 2, // 28: examples.Busi.TransOutTcc:output_type -> dtmgrpc.BusiReply + 2, // 29: examples.Busi.TransInTccNested:output_type -> dtmgrpc.BusiReply + 1, // 30: examples.Busi.TransInBSaga:output_type -> google.protobuf.Empty + 1, // 31: examples.Busi.TransOutBSaga:output_type -> google.protobuf.Empty + 1, // 32: examples.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty + 1, // 33: examples.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty + 17, // [17:34] is the sub-list for method output_type + 0, // [0:17] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/examples/busi.proto b/examples/busi.proto index 69a9c80..19ecf55 100644 --- a/examples/busi.proto +++ b/examples/busi.proto @@ -22,5 +22,10 @@ service Busi { rpc TransInTcc(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {} rpc TransOutTcc(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {} rpc TransInTccNested(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {} + + rpc TransInBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransOutBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransInRevertBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransOutRevertBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} } diff --git a/examples/busi_grpc.pb.go b/examples/busi_grpc.pb.go index 46c37a9..9016161 100644 --- a/examples/busi_grpc.pb.go +++ b/examples/busi_grpc.pb.go @@ -33,6 +33,10 @@ type BusiClient interface { TransInTcc(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error) TransOutTcc(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error) + TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -160,6 +164,42 @@ func (c *busiClient) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiReque return out, nil } +func (c *busiClient) TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransInBSaga", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransOutBSaga", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransInRevertBSaga", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/TransOutRevertBSaga", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -177,6 +217,10 @@ type BusiServer interface { TransInTcc(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error) TransOutTcc(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error) TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error) + TransInBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransOutBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransInRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransOutRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -223,6 +267,18 @@ func (UnimplementedBusiServer) TransOutTcc(context.Context, *dtmgrpc.BusiRequest func (UnimplementedBusiServer) TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error) { return nil, status.Errorf(codes.Unimplemented, "method TransInTccNested not implemented") } +func (UnimplementedBusiServer) TransInBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInBSaga not implemented") +} +func (UnimplementedBusiServer) TransOutBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutBSaga not implemented") +} +func (UnimplementedBusiServer) TransInRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInRevertBSaga not implemented") +} +func (UnimplementedBusiServer) TransOutRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertBSaga not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -470,6 +526,78 @@ func _Busi_TransInTccNested_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _Busi_TransInBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInBSaga(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransInBSaga", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInBSaga(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutBSaga(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransOutBSaga", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutBSaga(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransInRevertBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInRevertBSaga(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransInRevertBSaga", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInRevertBSaga(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutRevertBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutRevertBSaga(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/TransOutRevertBSaga", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutRevertBSaga(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -529,6 +657,22 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransInTccNested", Handler: _Busi_TransInTccNested_Handler, }, + { + MethodName: "TransInBSaga", + Handler: _Busi_TransInBSaga_Handler, + }, + { + MethodName: "TransOutBSaga", + Handler: _Busi_TransOutBSaga_Handler, + }, + { + MethodName: "TransInRevertBSaga", + Handler: _Busi_TransInRevertBSaga_Handler, + }, + { + MethodName: "TransOutRevertBSaga", + Handler: _Busi_TransOutRevertBSaga_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "examples/busi.proto", diff --git a/examples/grpc_saga_barrier.go b/examples/grpc_saga_barrier.go new file mode 100644 index 0000000..94183cc --- /dev/null +++ b/examples/grpc_saga_barrier.go @@ -0,0 +1,90 @@ +package examples + +import ( + "context" + "database/sql" + + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmgrpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +func init() { + addSample("grpc_saga_barrier", func() string { + req := dtmcli.MustMarshal(&TransReq{Amount: 30}) + gid := dtmgrpc.MustGenGid(DtmGrpcServer) + saga := dtmgrpc.NewSaga(DtmGrpcServer, gid). + Add(BusiGrpc+"/examples.Busi/TransOutBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req). + Add(BusiGrpc+"/examples.Busi/TransInBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req) + err := saga.Submit() + dtmcli.FatalIfError(err) + return saga.Gid + }) +} + +func sagaGrpcBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) { + _, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid) + return dtmcli.ResultSuccess, err + +} + +func (s *busiServer) TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.BusiData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName()) +} + +func (s *busiServer) TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.BusiData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), req.TransOutResult, dtmcli.GetFuncName()) +} + +func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.BusiData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInRevertResult.Fetch(), "", dtmcli.GetFuncName()) +} + +func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.BusiData, &req) + return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutRevertResult.Fetch(), "", dtmcli.GetFuncName()) +} + +func sagaBarrierTransIn(c *gin.Context) (interface{}, error) { + req := reqFrom(c) + if req.TransInResult != "" { + return req.TransInResult, nil + } + barrier := MustBarrierFromGin(c) + return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { + return sagaBarrierAdjustBalance(sdb, 1, req.Amount) + }) +} + +func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { + barrier := MustBarrierFromGin(c) + return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { + return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount) + }) +} + +func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { + req := reqFrom(c) + if req.TransInResult != "" { + return req.TransInResult, nil + } + barrier := MustBarrierFromGin(c) + return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { + return sagaBarrierAdjustBalance(sdb, 2, -req.Amount) + }) +} + +func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { + barrier := MustBarrierFromGin(c) + return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { + return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount) + }) +} diff --git a/examples/http_saga.go b/examples/http_saga.go index f9189d6..d014eee 100644 --- a/examples/http_saga.go +++ b/examples/http_saga.go @@ -7,11 +7,7 @@ import ( func init() { addSample("saga", func() string { dtmcli.Logf("a saga busi transaction begin") - req := &TransReq{ - Amount: 30, - TransInResult: "SUCCESS", - TransOutResult: "SUCCESS", - } + req := &TransReq{Amount: 30} saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). Add(Busi+"/TransIn", Busi+"/TransInRevert", req) @@ -21,4 +17,17 @@ func init() { dtmcli.FatalIfError(err) return saga.Gid }) + addSample("saga_wait", func() string { + dtmcli.Logf("a saga busi transaction begin") + req := &TransReq{Amount: 30} + saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). + Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). + Add(Busi+"/TransIn", Busi+"/TransInRevert", req) + saga.WaitResult = true // 设置为等待结果模式,后面的submit调用,会等待服务器处理这个事务。如果Submit正常返回,那么整个全局事务已成功完成 + err := saga.Submit() + dtmcli.Logf("result gid is: %s", saga.Gid) + dtmcli.FatalIfError(err) + return saga.Gid + }) + } diff --git a/examples/http_saga_barrier.go b/examples/http_saga_barrier.go index eec73a9..9e60888 100644 --- a/examples/http_saga_barrier.go +++ b/examples/http_saga_barrier.go @@ -39,14 +39,14 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 1, req.Amount) }) } func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount) }) @@ -57,14 +57,14 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 2, -req.Amount) }) } func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount) }) diff --git a/examples/http_saga_wait.go b/examples/http_saga_wait.go deleted file mode 100644 index 84b3781..0000000 --- a/examples/http_saga_wait.go +++ /dev/null @@ -1,24 +0,0 @@ -package examples - -import ( - "github.com/yedf/dtm/dtmcli" -) - -func init() { - addSample("saga_wait", func() string { - dtmcli.Logf("a saga busi transaction begin") - req := &TransReq{ - Amount: 30, - TransInResult: "SUCCESS", - TransOutResult: "SUCCESS", - } - saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)). - Add(Busi+"/TransOut", Busi+"/TransOutRevert", req). - Add(Busi+"/TransIn", Busi+"/TransInRevert", req) - saga.WaitResult = true // 设置为等待结果模式,后面的submit调用,会等待服务器处理这个事务。如果Submit正常返回,那么整个全局事务已成功完成 - err := saga.Submit() - dtmcli.Logf("result gid is: %s", saga.Gid) - dtmcli.FatalIfError(err) - return saga.Gid - }) -} diff --git a/examples/http_tcc_barrier.go b/examples/http_tcc_barrier.go index 273f5fd..755313a 100644 --- a/examples/http_tcc_barrier.go +++ b/examples/http_tcc_barrier.go @@ -62,21 +62,21 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustTrading(sdb, transInUID, req.Amount) }) } func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustBalance(sdb, transInUID, reqFrom(c).Amount) }) } func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustTrading(sdb, transInUID, -reqFrom(c).Amount) }) @@ -87,14 +87,14 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) { if req.TransInResult != "" { return req.TransInResult, nil } - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustTrading(sdb, transOutUID, -req.Amount) }) } func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount) }) @@ -102,7 +102,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) { // TccBarrierTransOutCancel will be use in test func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) { - barrier := MustGetTrans(c) + barrier := MustBarrierFromGin(c) return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) { return adjustTrading(sdb, transOutUID, reqFrom(c).Amount) })