saga grpc seems ok
This commit is contained in:
parent
0bc435e01c
commit
4e7f6e36ad
@ -105,15 +105,6 @@ func genMsg(gid string) *dtmcli.Msg {
|
||||
return msg
|
||||
}
|
||||
|
||||
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
|
||||
dtmcli.Logf("beginning a saga test ---------------- %s", gid)
|
||||
saga := dtmcli.NewSaga(examples.DtmServer, gid)
|
||||
req := examples.GenTransReq(30, outFailed, inFailed)
|
||||
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
|
||||
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
|
||||
return saga
|
||||
}
|
||||
|
||||
func transQuery(t *testing.T, gid string) {
|
||||
resp, err := dtmcli.RestyClient.R().SetQueryParam("gid", gid).Get(examples.DtmServer + "/query")
|
||||
e2p(err)
|
||||
|
||||
@ -160,6 +160,7 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string {
|
||||
|
||||
func (t *TransGlobal) getURLResult(url string, branchID, branchType string, branchData []byte) string {
|
||||
if t.Protocol == "grpc" {
|
||||
dtmcli.PanicIf(strings.HasPrefix(url, "http"), fmt.Errorf("bad url for grpc: %s", url))
|
||||
server, method := dtmgrpc.GetServerAndMethod(url)
|
||||
conn := dtmgrpc.MustGetGrpcConn(server)
|
||||
err := conn.Invoke(context.Background(), method, &dtmgrpc.BusiRequest{
|
||||
@ -178,6 +179,7 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran
|
||||
}
|
||||
return err.Error()
|
||||
}
|
||||
dtmcli.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url))
|
||||
resp, err := dtmcli.RestyClient.R().SetBody(branchData).
|
||||
SetQueryParams(dtmcli.MS{
|
||||
"gid": t.Gid,
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/yedf/dtm/dtmcli"
|
||||
"github.com/yedf/dtm/examples"
|
||||
)
|
||||
|
||||
@ -45,3 +46,12 @@ func sagaRollback(t *testing.T) {
|
||||
assert.Equal(t, "failed", getTransStatus(saga.Gid))
|
||||
assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
|
||||
}
|
||||
|
||||
func genSaga(gid string, outFailed bool, inFailed bool) *dtmcli.Saga {
|
||||
dtmcli.Logf("beginning a saga test ---------------- %s", gid)
|
||||
saga := dtmcli.NewSaga(examples.DtmServer, gid)
|
||||
req := examples.GenTransReq(30, outFailed, inFailed)
|
||||
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
|
||||
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
|
||||
return saga
|
||||
}
|
||||
|
||||
@ -29,7 +29,7 @@ var file_examples_busi_proto_rawDesc = []byte{
|
||||
0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63,
|
||||
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x32, 0xbd, 0x02, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09,
|
||||
0x6f, 0x74, 0x6f, 0x32, 0xc2, 0x03, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09,
|
||||
0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67,
|
||||
0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
|
||||
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
|
||||
@ -49,9 +49,17 @@ var file_examples_busi_proto_rawDesc = []byte{
|
||||
0x65, 0x72, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75,
|
||||
0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
|
||||
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
|
||||
0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
||||
0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70,
|
||||
0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x79, 0x22, 0x00, 0x12, 0x40, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x43, 0x6f,
|
||||
0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e,
|
||||
0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f,
|
||||
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d,
|
||||
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x41, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75,
|
||||
0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72,
|
||||
0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16,
|
||||
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
|
||||
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68,
|
||||
0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f,
|
||||
0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var file_examples_busi_proto_goTypes = []interface{}{
|
||||
@ -64,13 +72,17 @@ var file_examples_busi_proto_depIdxs = []int32{
|
||||
0, // 2: examples.Busi.TransOut:input_type -> dtmgrpc.BusiRequest
|
||||
0, // 3: examples.Busi.TransInRevert:input_type -> dtmgrpc.BusiRequest
|
||||
0, // 4: examples.Busi.TransOutRevert:input_type -> dtmgrpc.BusiRequest
|
||||
1, // 5: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty
|
||||
1, // 6: examples.Busi.TransIn:output_type -> google.protobuf.Empty
|
||||
1, // 7: examples.Busi.TransOut:output_type -> google.protobuf.Empty
|
||||
1, // 8: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty
|
||||
1, // 9: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty
|
||||
5, // [5:10] is the sub-list for method output_type
|
||||
0, // [0:5] is the sub-list for method input_type
|
||||
0, // 5: examples.Busi.TransInConfirm:input_type -> dtmgrpc.BusiRequest
|
||||
0, // 6: examples.Busi.TransOutConfirm:input_type -> dtmgrpc.BusiRequest
|
||||
1, // 7: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty
|
||||
1, // 8: examples.Busi.TransIn:output_type -> google.protobuf.Empty
|
||||
1, // 9: examples.Busi.TransOut:output_type -> google.protobuf.Empty
|
||||
1, // 10: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty
|
||||
1, // 11: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty
|
||||
1, // 12: examples.Busi.TransInConfirm:output_type -> google.protobuf.Empty
|
||||
1, // 13: examples.Busi.TransOutConfirm:output_type -> google.protobuf.Empty
|
||||
7, // [7:14] is the sub-list for method output_type
|
||||
0, // [0:7] is the sub-list for method input_type
|
||||
0, // [0:0] is the sub-list for extension type_name
|
||||
0, // [0:0] is the sub-list for extension extendee
|
||||
0, // [0:0] is the sub-list for field type_name
|
||||
|
||||
@ -13,5 +13,7 @@ service Busi {
|
||||
rpc TransOut(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
|
||||
rpc TransInRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
|
||||
rpc TransOutRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
|
||||
rpc TransInConfirm(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
|
||||
rpc TransOutConfirm(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {}
|
||||
}
|
||||
|
||||
|
||||
@ -25,6 +25,8 @@ type BusiClient interface {
|
||||
TransOut(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
}
|
||||
|
||||
type busiClient struct {
|
||||
@ -80,6 +82,24 @@ func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *busiClient) TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, "/examples.Busi/TransInConfirm", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *busiClient) TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, "/examples.Busi/TransOutConfirm", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// BusiServer is the server API for Busi service.
|
||||
// All implementations must embed UnimplementedBusiServer
|
||||
// for forward compatibility
|
||||
@ -89,6 +109,8 @@ type BusiServer interface {
|
||||
TransOut(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
|
||||
TransInRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
|
||||
TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
|
||||
TransInConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
|
||||
TransOutConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error)
|
||||
mustEmbedUnimplementedBusiServer()
|
||||
}
|
||||
|
||||
@ -111,6 +133,12 @@ func (UnimplementedBusiServer) TransInRevert(context.Context, *dtmgrpc.BusiReque
|
||||
func (UnimplementedBusiServer) TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method TransOutRevert not implemented")
|
||||
}
|
||||
func (UnimplementedBusiServer) TransInConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method TransInConfirm not implemented")
|
||||
}
|
||||
func (UnimplementedBusiServer) TransOutConfirm(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method TransOutConfirm not implemented")
|
||||
}
|
||||
func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {}
|
||||
|
||||
// UnsafeBusiServer may be embedded to opt out of forward compatibility for this service.
|
||||
@ -214,6 +242,42 @@ func _Busi_TransOutRevert_Handler(srv interface{}, ctx context.Context, dec func
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Busi_TransInConfirm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(dtmgrpc.BusiRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(BusiServer).TransInConfirm(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/examples.Busi/TransInConfirm",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(BusiServer).TransInConfirm(ctx, req.(*dtmgrpc.BusiRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Busi_TransOutConfirm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(dtmgrpc.BusiRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(BusiServer).TransOutConfirm(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/examples.Busi/TransOutConfirm",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(BusiServer).TransOutConfirm(ctx, req.(*dtmgrpc.BusiRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// Busi_ServiceDesc is the grpc.ServiceDesc for Busi service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
@ -241,6 +305,14 @@ var Busi_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "TransOutRevert",
|
||||
Handler: _Busi_TransOutRevert_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "TransInConfirm",
|
||||
Handler: _Busi_TransInConfirm_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "TransOutConfirm",
|
||||
Handler: _Busi_TransOutConfirm_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "examples/busi.proto",
|
||||
|
||||
@ -62,11 +62,35 @@ func (s *busiServer) CanSubmit(ctx context.Context, in *dtmgrpc.BusiRequest) (*e
|
||||
func (s *busiServer) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransInResult, MainSwitch.TransInResult.Fetch(), "TransIn")
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInResult.Fetch(), req.TransInResult, dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
func (s *busiServer) TransOut(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransOutResult, MainSwitch.TransOutResult.Fetch(), "TransOut")
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutResult.Fetch(), req.TransOutResult, dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
func (s *busiServer) TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInRevertResult.Fetch(), "", dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
func (s *busiServer) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutRevertResult.Fetch(), "", dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
func (s *busiServer) TransInConfirm(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransInConfirmResult.Fetch(), "", dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
func (s *busiServer) TransOutConfirm(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) {
|
||||
req := TransReq{}
|
||||
dtmcli.MustUnmarshal(in.AppData, &req)
|
||||
return &emptypb.Empty{}, handleGrpcBusiness(in, MainSwitch.TransOutConfirmResult.Fetch(), "", dtmcli.GetFuncName())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user