partial grpc saga

This commit is contained in:
yedf2 2021-08-12 16:33:27 +08:00
parent ac03ed4bdd
commit 723831e41f
10 changed files with 327 additions and 65 deletions

View File

@ -26,11 +26,16 @@ func (bb *BranchBarrier) String() string {
// BarrierFromQuery construct transaction info from request
func BarrierFromQuery(qs url.Values) (*BranchBarrier, error) {
return BarrierFrom(qs.Get("trans_type"), qs.Get("gid"), qs.Get("branch_id"), qs.Get("branch_type"))
}
// BarrierFrom construct transaction info from request
func BarrierFrom(transType, gid, branchID, branchType string) (*BranchBarrier, error) {
ti := &BranchBarrier{
TransType: qs.Get("trans_type"),
Gid: qs.Get("gid"),
BranchID: qs.Get("branch_id"),
BranchType: qs.Get("branch_type"),
TransType: transType,
Gid: gid,
BranchID: branchID,
BranchType: branchType,
}
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.BranchType == "" {
return nil, fmt.Errorf("invlid trans info: %v", ti)

View File

@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc"
)
// M alias
@ -70,9 +71,16 @@ func sdbGet() *sql.DB {
return db
}
// MustGetTrans construct transaction info from request
func MustGetTrans(c *gin.Context) *dtmcli.BranchBarrier {
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {
ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
dtmcli.FatalIfError(err)
return ti
}
// MustBarrierFromGrpc 1
func MustBarrierFromGrpc(in *dtmgrpc.BusiRequest) *dtmcli.BranchBarrier {
ti, err := dtmcli.BarrierFrom(in.Info.TransType, in.Info.Gid, in.Info.BranchID, in.Info.BranchType)
dtmcli.FatalIfError(err)
return ti
}

View File

@ -29,7 +29,7 @@ var file_examples_busi_proto_rawDesc = []byte{
0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x32, 0xa6, 0x06, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09,
0x6f, 0x74, 0x6f, 0x32, 0xb4, 0x08, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09,
0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67,
0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
@ -79,10 +79,27 @@ var file_examples_busi_proto_rawDesc = []byte{
0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x54, 0x63, 0x63, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64,
0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f,
0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x0c,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64,
0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x0d,
0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e,
0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x44, 0x0a,
0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53,
0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75,
0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52,
0x65, 0x76, 0x65, 0x72, 0x74, 0x42, 0x53, 0x61, 0x67, 0x61, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d,
0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74,
0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var file_examples_busi_proto_goTypes = []interface{}{
@ -104,21 +121,29 @@ var file_examples_busi_proto_depIdxs = []int32{
0, // 10: examples.Busi.TransInTcc:input_type -> dtmgrpc.BusiRequest
0, // 11: examples.Busi.TransOutTcc:input_type -> dtmgrpc.BusiRequest
0, // 12: examples.Busi.TransInTccNested:input_type -> dtmgrpc.BusiRequest
1, // 13: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty
1, // 14: examples.Busi.TransIn:output_type -> google.protobuf.Empty
1, // 15: examples.Busi.TransOut:output_type -> google.protobuf.Empty
1, // 16: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty
1, // 17: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty
1, // 18: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty
1, // 19: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
1, // 20: examples.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 21: examples.Busi.TransInXa:output_type -> dtmgrpc.BusiReply
2, // 22: examples.Busi.TransOutXa:output_type -> dtmgrpc.BusiReply
2, // 23: examples.Busi.TransInTcc:output_type -> dtmgrpc.BusiReply
2, // 24: examples.Busi.TransOutTcc:output_type -> dtmgrpc.BusiReply
2, // 25: examples.Busi.TransInTccNested:output_type -> dtmgrpc.BusiReply
13, // [13:26] is the sub-list for method output_type
0, // [0:13] is the sub-list for method input_type
0, // 13: examples.Busi.TransInBSaga:input_type -> dtmgrpc.BusiRequest
0, // 14: examples.Busi.TransOutBSaga:input_type -> dtmgrpc.BusiRequest
0, // 15: examples.Busi.TransInRevertBSaga:input_type -> dtmgrpc.BusiRequest
0, // 16: examples.Busi.TransOutRevertBSaga:input_type -> dtmgrpc.BusiRequest
1, // 17: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty
1, // 18: examples.Busi.TransIn:output_type -> google.protobuf.Empty
1, // 19: examples.Busi.TransOut:output_type -> google.protobuf.Empty
1, // 20: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty
1, // 21: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty
1, // 22: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty
1, // 23: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
1, // 24: examples.Busi.XaNotify:output_type -> google.protobuf.Empty
2, // 25: examples.Busi.TransInXa:output_type -> dtmgrpc.BusiReply
2, // 26: examples.Busi.TransOutXa:output_type -> dtmgrpc.BusiReply
2, // 27: examples.Busi.TransInTcc:output_type -> dtmgrpc.BusiReply
2, // 28: examples.Busi.TransOutTcc:output_type -> dtmgrpc.BusiReply
2, // 29: examples.Busi.TransInTccNested:output_type -> dtmgrpc.BusiReply
1, // 30: examples.Busi.TransInBSaga:output_type -> google.protobuf.Empty
1, // 31: examples.Busi.TransOutBSaga:output_type -> google.protobuf.Empty
1, // 32: examples.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty
1, // 33: examples.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty
17, // [17:34] is the sub-list for method output_type
0, // [0:17] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

View File

@ -22,5 +22,10 @@ service Busi {
rpc TransInTcc(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {}
rpc TransOutTcc(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {}
rpc TransInTccNested(dtmgrpc.BusiRequest) returns (dtmgrpc.BusiReply) {}
rpc TransInBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
rpc TransOutBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
rpc TransInRevertBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
rpc TransOutRevertBSaga(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
}

View File

@ -33,6 +33,10 @@ type BusiClient interface {
TransInTcc(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error)
TransOutTcc(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error)
TransInTccNested(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*dtmgrpc.BusiReply, error)
TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
}
type busiClient struct {
@ -160,6 +164,42 @@ func (c *busiClient) TransInTccNested(ctx context.Context, in *dtmgrpc.BusiReque
return out, nil
}
func (c *busiClient) TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/examples.Busi/TransInBSaga", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/examples.Busi/TransOutBSaga", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/examples.Busi/TransInRevertBSaga", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *busiClient) TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/examples.Busi/TransOutRevertBSaga", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BusiServer is the server API for Busi service.
// All implementations must embed UnimplementedBusiServer
// for forward compatibility
@ -177,6 +217,10 @@ type BusiServer interface {
TransInTcc(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error)
TransOutTcc(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error)
TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error)
TransInBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
TransOutBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
TransInRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
TransOutRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
mustEmbedUnimplementedBusiServer()
}
@ -223,6 +267,18 @@ func (UnimplementedBusiServer) TransOutTcc(context.Context, *dtmgrpc.BusiRequest
func (UnimplementedBusiServer) TransInTccNested(context.Context, *dtmgrpc.BusiRequest) (*dtmgrpc.BusiReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransInTccNested not implemented")
}
func (UnimplementedBusiServer) TransInBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransInBSaga not implemented")
}
func (UnimplementedBusiServer) TransOutBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutBSaga not implemented")
}
func (UnimplementedBusiServer) TransInRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransInRevertBSaga not implemented")
}
func (UnimplementedBusiServer) TransOutRevertBSaga(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertBSaga not implemented")
}
func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {}
// UnsafeBusiServer may be embedded to opt out of forward compatibility for this service.
@ -470,6 +526,78 @@ func _Busi_TransInTccNested_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _Busi_TransInBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(dtmgrpc.BusiRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransInBSaga(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/examples.Busi/TransInBSaga",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransInBSaga(ctx, req.(*dtmgrpc.BusiRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(dtmgrpc.BusiRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutBSaga(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/examples.Busi/TransOutBSaga",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutBSaga(ctx, req.(*dtmgrpc.BusiRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransInRevertBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(dtmgrpc.BusiRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransInRevertBSaga(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/examples.Busi/TransInRevertBSaga",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransInRevertBSaga(ctx, req.(*dtmgrpc.BusiRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Busi_TransOutRevertBSaga_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(dtmgrpc.BusiRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BusiServer).TransOutRevertBSaga(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/examples.Busi/TransOutRevertBSaga",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BusiServer).TransOutRevertBSaga(ctx, req.(*dtmgrpc.BusiRequest))
}
return interceptor(ctx, in, info, handler)
}
// Busi_ServiceDesc is the grpc.ServiceDesc for Busi service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -529,6 +657,22 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
MethodName: "TransInTccNested",
Handler: _Busi_TransInTccNested_Handler,
},
{
MethodName: "TransInBSaga",
Handler: _Busi_TransInBSaga_Handler,
},
{
MethodName: "TransOutBSaga",
Handler: _Busi_TransOutBSaga_Handler,
},
{
MethodName: "TransInRevertBSaga",
Handler: _Busi_TransInRevertBSaga_Handler,
},
{
MethodName: "TransOutRevertBSaga",
Handler: _Busi_TransOutRevertBSaga_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "examples/busi.proto",

View File

@ -0,0 +1,90 @@
package examples
import (
"context"
"database/sql"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc"
"google.golang.org/protobuf/types/known/emptypb"
)
func init() {
addSample("grpc_saga_barrier", func() string {
req := dtmcli.MustMarshal(&TransReq{Amount: 30})
gid := dtmgrpc.MustGenGid(DtmGrpcServer)
saga := dtmgrpc.NewSaga(DtmGrpcServer, gid).
Add(BusiGrpc+"/examples.Busi/TransOutBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req).
Add(BusiGrpc+"/examples.Busi/TransInBSaga", BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req)
err := saga.Submit()
dtmcli.FatalIfError(err)
return saga.Gid
})
}
func sagaGrpcBarrierAdjustBalance(sdb *sql.Tx, uid int, amount int) (interface{}, error) {
_, err := dtmcli.StxExec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
return dtmcli.ResultSuccess, err
}
func (s *busiServer) TransInBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName())
}
func (s *busiServer) TransOutBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), req.TransOutResult, dtmcli.GetFuncName())
}
func (s *busiServer) TransInRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInRevertResult.Fetch(), "", dtmcli.GetFuncName())
}
func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutRevertResult.Fetch(), "", dtmcli.GetFuncName())
}
func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, req.Amount)
})
}
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount)
})
}
func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, -req.Amount)
})
}
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount)
})
}

View File

@ -7,11 +7,7 @@ import (
func init() {
addSample("saga", func() string {
dtmcli.Logf("a saga busi transaction begin")
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
@ -21,4 +17,17 @@ func init() {
dtmcli.FatalIfError(err)
return saga.Gid
})
addSample("saga_wait", func() string {
dtmcli.Logf("a saga busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
saga.WaitResult = true // 设置为等待结果模式后面的submit调用会等待服务器处理这个事务。如果Submit正常返回那么整个全局事务已成功完成
err := saga.Submit()
dtmcli.Logf("result gid is: %s", saga.Gid)
dtmcli.FatalIfError(err)
return saga.Gid
})
}

View File

@ -39,14 +39,14 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, req.Amount)
})
}
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 1, -reqFrom(c).Amount)
})
@ -57,14 +57,14 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, -req.Amount)
})
}
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return sagaBarrierAdjustBalance(sdb, 2, reqFrom(c).Amount)
})

View File

@ -1,24 +0,0 @@
package examples
import (
"github.com/yedf/dtm/dtmcli"
)
func init() {
addSample("saga_wait", func() string {
dtmcli.Logf("a saga busi transaction begin")
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
saga.WaitResult = true // 设置为等待结果模式后面的submit调用会等待服务器处理这个事务。如果Submit正常返回那么整个全局事务已成功完成
err := saga.Submit()
dtmcli.Logf("result gid is: %s", saga.Gid)
dtmcli.FatalIfError(err)
return saga.Gid
})
}

View File

@ -62,21 +62,21 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustTrading(sdb, transInUID, req.Amount)
})
}
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustBalance(sdb, transInUID, reqFrom(c).Amount)
})
}
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustTrading(sdb, transInUID, -reqFrom(c).Amount)
})
@ -87,14 +87,14 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) {
if req.TransInResult != "" {
return req.TransInResult, nil
}
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustTrading(sdb, transOutUID, -req.Amount)
})
}
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustBalance(sdb, transOutUID, -reqFrom(c).Amount)
})
@ -102,7 +102,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
// TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
barrier := MustGetTrans(c)
barrier := MustBarrierFromGin(c)
return barrier.Call(sdbGet(), func(sdb *sql.Tx) (interface{}, error) {
return adjustTrading(sdb, transOutUID, reqFrom(c).Amount)
})