From 0bc435e01c170235adb5414348cd127f65a9c81d Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 10 Aug 2021 14:03:12 +0800 Subject: [PATCH] grpc msg ok --- dtmcli/types.go | 3 ++ dtmcli/utils.go | 9 +++-- dtmgrpc/dtmgrpc.pb.go | 20 +++++++---- dtmgrpc/dtmgrpc.proto | 1 + dtmgrpc/dtmgrpc_grpc.pb.go | 36 +++++++++++++++++++ dtmgrpc/message.go | 12 +++++++ dtmgrpc/type.go | 13 +++++++ dtmsvr/api.go | 12 ++++++- dtmsvr/api_grpc.go | 10 ++++-- dtmsvr/trans.go | 22 +++++++----- dtmsvr/trans_grpc_msg_test.go | 14 +++++--- dtmsvr/trans_msg.go | 4 +-- examples/busi.pb.go | 66 +++++++++++++++++++---------------- examples/busi.proto | 1 + examples/busi_grpc.pb.go | 36 +++++++++++++++++++ examples/main_base.go | 4 +-- examples/main_grpc.go | 11 ++++-- examples/main_grpc_msg.go | 4 +-- 18 files changed, 212 insertions(+), 66 deletions(-) diff --git a/dtmcli/types.go b/dtmcli/types.go index 2b61c19..a5f03ad 100644 --- a/dtmcli/types.go +++ b/dtmcli/types.go @@ -89,6 +89,9 @@ func (tb *TransBase) CallDtm(body interface{}, operation string) error { // ErrFailure 表示返回失败,要求回滚 var ErrFailure = errors.New("transaction FAILURE") +// ErrPending 表示暂时失败,要求重试 +var ErrPending = errors.New("transaction PENDING") + // ResultSuccess 表示返回成功,可以进行下一步 var ResultSuccess = M{"dtm_result": "SUCCESS"} diff --git a/dtmcli/utils.go b/dtmcli/utils.go index 676ef0b..d8ac577 100644 --- a/dtmcli/utils.go +++ b/dtmcli/utils.go @@ -264,8 +264,13 @@ func CheckResult(res interface{}, err error) error { if ok { return CheckResponse(resp, err) } - if res != nil && strings.Contains(MustMarshalString(res), "FAILURE") { - return ErrFailure + if res != nil { + str := MustMarshalString(res) + if strings.Contains(str, "FAILURE") { + return ErrFailure + } else if strings.Contains(str, "PENDING") { + return ErrPending + } } return err } diff --git a/dtmgrpc/dtmgrpc.pb.go b/dtmgrpc/dtmgrpc.pb.go index 11ad1b3..86df863 100644 --- a/dtmgrpc/dtmgrpc.pb.go +++ b/dtmgrpc/dtmgrpc.pb.go @@ -418,13 +418,17 @@ var file_dtmgrpc_dtmgrpc_proto_rawDesc = []byte{ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x32, 0x3e, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, + 0x32, 0x78, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, - 0x42, 0x1d, 0x5a, 0x1b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, - 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x38, 0x0a, 0x07, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x64, 0x74, + 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1d, 0x5a, 0x1b, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, + 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -455,9 +459,11 @@ var file_dtmgrpc_dtmgrpc_proto_depIdxs = []int32{ 0, // 1: dtmgrpc.BusiRequest.info:type_name -> dtmgrpc.DtmTransInfo 6, // 2: dtmgrpc.BusiRequest.Extra:type_name -> dtmgrpc.BusiRequest.ExtraEntry 1, // 3: dtmgrpc.Dtm.Submit:input_type -> dtmgrpc.DtmRequest - 7, // 4: dtmgrpc.Dtm.Submit:output_type -> google.protobuf.Empty - 4, // [4:5] is the sub-list for method output_type - 3, // [3:4] is the sub-list for method input_type + 1, // 4: dtmgrpc.Dtm.Prepare:input_type -> dtmgrpc.DtmRequest + 7, // 5: dtmgrpc.Dtm.Submit:output_type -> google.protobuf.Empty + 7, // 6: dtmgrpc.Dtm.Prepare:output_type -> google.protobuf.Empty + 5, // [5:7] is the sub-list for method output_type + 3, // [3:5] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name diff --git a/dtmgrpc/dtmgrpc.proto b/dtmgrpc/dtmgrpc.proto index 7931fc0..c2d9a8b 100644 --- a/dtmgrpc/dtmgrpc.proto +++ b/dtmgrpc/dtmgrpc.proto @@ -8,6 +8,7 @@ package dtmgrpc; // The dtm service definition. service Dtm { rpc Submit(DtmRequest) returns (google.protobuf.Empty) {} + rpc Prepare(DtmRequest) returns (google.protobuf.Empty) {} } diff --git a/dtmgrpc/dtmgrpc_grpc.pb.go b/dtmgrpc/dtmgrpc_grpc.pb.go index e6a8d8e..ce2efb5 100644 --- a/dtmgrpc/dtmgrpc_grpc.pb.go +++ b/dtmgrpc/dtmgrpc_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DtmClient interface { Submit(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Prepare(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type dtmClient struct { @@ -39,11 +40,21 @@ func (c *dtmClient) Submit(ctx context.Context, in *DtmRequest, opts ...grpc.Cal return out, nil } +func (c *dtmClient) Prepare(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/dtmgrpc.Dtm/Prepare", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DtmServer is the server API for Dtm service. // All implementations must embed UnimplementedDtmServer // for forward compatibility type DtmServer interface { Submit(context.Context, *DtmRequest) (*emptypb.Empty, error) + Prepare(context.Context, *DtmRequest) (*emptypb.Empty, error) mustEmbedUnimplementedDtmServer() } @@ -54,6 +65,9 @@ type UnimplementedDtmServer struct { func (UnimplementedDtmServer) Submit(context.Context, *DtmRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Submit not implemented") } +func (UnimplementedDtmServer) Prepare(context.Context, *DtmRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Prepare not implemented") +} func (UnimplementedDtmServer) mustEmbedUnimplementedDtmServer() {} // UnsafeDtmServer may be embedded to opt out of forward compatibility for this service. @@ -85,6 +99,24 @@ func _Dtm_Submit_Handler(srv interface{}, ctx context.Context, dec func(interfac return interceptor(ctx, in, info, handler) } +func _Dtm_Prepare_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DtmRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DtmServer).Prepare(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dtmgrpc.Dtm/Prepare", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DtmServer).Prepare(ctx, req.(*DtmRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Dtm_ServiceDesc is the grpc.ServiceDesc for Dtm service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -96,6 +128,10 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{ MethodName: "Submit", Handler: _Dtm_Submit_Handler, }, + { + MethodName: "Prepare", + Handler: _Dtm_Prepare_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "dtmgrpc/dtmgrpc.proto", diff --git a/dtmgrpc/message.go b/dtmgrpc/message.go index 7656077..ae3f4a4 100644 --- a/dtmgrpc/message.go +++ b/dtmgrpc/message.go @@ -44,3 +44,15 @@ func (s *MsgGrpc) Submit() error { }) return err } + +// Prepare prepare the msg +func (s *MsgGrpc) Prepare(queryPrepared string) error { + s.QueryPrepared = dtmcli.OrString(queryPrepared, s.QueryPrepared) + _, err := MustGetDtmClient(s.Dtm).Prepare(context.Background(), &DtmRequest{ + Gid: s.Gid, + TransType: s.TransType, + QueryPrepared: s.QueryPrepared, + Data: dtmcli.MustMarshalString(&s.Steps), + }) + return err +} diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index 54d8f76..ac2aa12 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -7,6 +7,8 @@ import ( "github.com/yedf/dtm/dtmcli" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) var clients = map[string]*grpc.ClientConn{} @@ -69,3 +71,14 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c } return err } + +// Result2Error 将通用的result转成grpc的error +func Result2Error(res interface{}, err error) error { + e := dtmcli.CheckResult(res, err) + if e == dtmcli.ErrFailure { + return status.New(codes.Aborted, fmt.Sprintf("failure: res: %v, err: %s", res, e.Error())).Err() + } else if e == dtmcli.ErrPending { + return status.New(codes.Unavailable, fmt.Sprintf("failure: res: %v, err: %s", res, e.Error())).Err() + } + return e +} diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 38968bc..d95308f 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -1,6 +1,10 @@ package dtmsvr -import "fmt" +import ( + "fmt" + + "github.com/yedf/dtm/dtmcli" +) func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) { db := dbGet() @@ -13,3 +17,9 @@ func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) { return t.Process(db, waitResult), nil } + +func svcPrepare(t *TransGlobal) (interface{}, error) { + t.Status = "prepared" + t.saveNew(dbGet()) + return dtmcli.ResultSuccess, nil +} diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index 9c63bd6..ac09687 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -3,6 +3,7 @@ package dtmsvr import ( "context" + "github.com/yedf/dtm/dtmgrpc" pb "github.com/yedf/dtm/dtmgrpc" "google.golang.org/protobuf/types/known/emptypb" ) @@ -13,6 +14,11 @@ type dtmServer struct { } func (s *dtmServer) Submit(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { - svcSubmit(TransFromDtmRequest(in), in.WaitResult) - return &emptypb.Empty{}, nil + r, err := svcSubmit(TransFromDtmRequest(in), in.WaitResult) + return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) +} + +func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) { + r, err := svcPrepare(TransFromDtmRequest(in)) + return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err) } diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index fe251e7..08f5ef1 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -158,18 +158,18 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string { return []string{"next_cron_interval", "next_cron_time"} } -func (t *TransGlobal) getBranchResult(branch *TransBranch) string { +func (t *TransGlobal) getURLResult(url string, branchID, branchType string, branchData []byte) string { if t.Protocol == "grpc" { - server, method := dtmgrpc.GetServerAndMethod(branch.URL) + server, method := dtmgrpc.GetServerAndMethod(url) conn := dtmgrpc.MustGetGrpcConn(server) err := conn.Invoke(context.Background(), method, &dtmgrpc.BusiRequest{ Info: &dtmgrpc.DtmTransInfo{ Gid: t.Gid, TransType: t.TransType, - BranchID: branch.BranchID, - BranchType: branch.BranchType, + BranchID: branchID, + BranchType: branchType, }, - AppData: []byte(branch.Data), + AppData: []byte(branchData), }, &emptypb.Empty{}) if err == nil { return "SUCCESS" @@ -178,19 +178,23 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) string { } return err.Error() } - resp, err := dtmcli.RestyClient.R().SetBody(branch.Data). + resp, err := dtmcli.RestyClient.R().SetBody(branchData). SetQueryParams(dtmcli.MS{ "gid": t.Gid, "trans_type": t.TransType, - "branch_id": branch.BranchID, - "branch_type": branch.BranchType, + "branch_id": branchID, + "branch_type": branchType, }). SetHeader("Content-type", "application/json"). - Post(branch.URL) + Post(url) e2p(err) return resp.String() } +func (t *TransGlobal) getBranchResult(branch *TransBranch) string { + return t.getURLResult(branch.URL, branch.BranchID, branch.BranchType, []byte(branch.Data)) +} + func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) { body := t.getBranchResult(branch) if strings.Contains(body, "SUCCESS") { diff --git a/dtmsvr/trans_grpc_msg_test.go b/dtmsvr/trans_grpc_msg_test.go index 4273398..007e433 100644 --- a/dtmsvr/trans_grpc_msg_test.go +++ b/dtmsvr/trans_grpc_msg_test.go @@ -1,6 +1,7 @@ package dtmsvr import ( + "fmt" "testing" "time" @@ -25,10 +26,13 @@ func grpcMsgNormal(t *testing.T) { func grpcMsgPending(t *testing.T) { msg := genGrpcMsg("grpc-msg-pending") - examples.MainSwitch.TransInResult.SetOnce("PENDING") - err := msg.Submit() + err := msg.Prepare(fmt.Sprintf("%s/examples.Busi/CanSubmit", examples.BusiGrpc)) assert.Nil(t, err) - WaitTransProcessed(msg.Gid) + examples.MainSwitch.CanSubmitResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) + assert.Equal(t, "prepared", getTransStatus(msg.Gid)) + examples.MainSwitch.TransInResult.SetOnce("PENDING") + CronTransOnce(60 * time.Second) assert.Equal(t, "submitted", getTransStatus(msg.Gid)) CronTransOnce(60 * time.Second) assert.Equal(t, "succeed", getTransStatus(msg.Gid)) @@ -37,7 +41,7 @@ func grpcMsgPending(t *testing.T) { func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc { req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30}) return dtmgrpc.NewMsgGrpc(examples.DtmGrpcServer, gid). - Add(examples.BusiPb+"/examples.Busi/TransOut", req). - Add(examples.BusiPb+"/examples.Busi/TransIn", req) + Add(examples.BusiGrpc+"/examples.Busi/TransOut", req). + Add(examples.BusiGrpc+"/examples.Busi/TransIn", req) } diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go index 87e5e68..b020d1a 100644 --- a/dtmsvr/trans_msg.go +++ b/dtmsvr/trans_msg.go @@ -36,9 +36,7 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) { if t.Status != "prepared" { return } - resp, err := dtmcli.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared) - e2p(err) - body := resp.String() + body := t.getURLResult(t.QueryPrepared, "", "", nil) if strings.Contains(body, "SUCCESS") { t.changeStatus(db, "submitted") } else { diff --git a/examples/busi.pb.go b/examples/busi.pb.go index 3b427db..92c347b 100644 --- a/examples/busi.pb.go +++ b/examples/busi.pb.go @@ -26,28 +26,32 @@ var File_examples_busi_proto protoreflect.FileDescriptor var file_examples_busi_proto_rawDesc = []byte{ 0x0a, 0x13, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x1a, - 0x11, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2f, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, - 0xf8, 0x01, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x37, 0x0a, 0x07, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x49, 0x6e, 0x12, 0x12, 0x2e, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x42, 0x75, 0x73, 0x69, + 0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x32, 0xbd, 0x02, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x3b, 0x0a, 0x09, + 0x43, 0x61, 0x6e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, + 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x07, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, + 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, + 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, + 0x12, 0x3f, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, + 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x12, 0x38, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x12, 0x2e, - 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0d, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x64, - 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x0e, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x64, - 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, - 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x00, 0x12, 0x40, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x76, + 0x65, 0x72, 0x74, 0x12, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x42, 0x1e, 0x5a, 0x1c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, + 0x6c, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_examples_busi_proto_goTypes = []interface{}{ @@ -55,16 +59,18 @@ var file_examples_busi_proto_goTypes = []interface{}{ (*emptypb.Empty)(nil), // 1: google.protobuf.Empty } var file_examples_busi_proto_depIdxs = []int32{ - 0, // 0: examples.Busi.TransIn:input_type -> dtmgrpc.BusiRequest - 0, // 1: examples.Busi.TransOut:input_type -> dtmgrpc.BusiRequest - 0, // 2: examples.Busi.TransInRevert:input_type -> dtmgrpc.BusiRequest - 0, // 3: examples.Busi.TransOutRevert:input_type -> dtmgrpc.BusiRequest - 1, // 4: examples.Busi.TransIn:output_type -> google.protobuf.Empty - 1, // 5: examples.Busi.TransOut:output_type -> google.protobuf.Empty - 1, // 6: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty - 1, // 7: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 0, // 0: examples.Busi.CanSubmit:input_type -> dtmgrpc.BusiRequest + 0, // 1: examples.Busi.TransIn:input_type -> dtmgrpc.BusiRequest + 0, // 2: examples.Busi.TransOut:input_type -> dtmgrpc.BusiRequest + 0, // 3: examples.Busi.TransInRevert:input_type -> dtmgrpc.BusiRequest + 0, // 4: examples.Busi.TransOutRevert:input_type -> dtmgrpc.BusiRequest + 1, // 5: examples.Busi.CanSubmit:output_type -> google.protobuf.Empty + 1, // 6: examples.Busi.TransIn:output_type -> google.protobuf.Empty + 1, // 7: examples.Busi.TransOut:output_type -> google.protobuf.Empty + 1, // 8: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty + 1, // 9: examples.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/examples/busi.proto b/examples/busi.proto index 402bb6b..ea06edb 100644 --- a/examples/busi.proto +++ b/examples/busi.proto @@ -8,6 +8,7 @@ import "google/protobuf/empty.proto"; // The dtm service definition. service Busi { + rpc CanSubmit(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransIn(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransOut(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} rpc TransInRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} diff --git a/examples/busi_grpc.pb.go b/examples/busi_grpc.pb.go index fbac28c..287a4cb 100644 --- a/examples/busi_grpc.pb.go +++ b/examples/busi_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type BusiClient interface { + CanSubmit(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOut(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) @@ -34,6 +35,15 @@ func NewBusiClient(cc grpc.ClientConnInterface) BusiClient { return &busiClient{cc} } +func (c *busiClient) CanSubmit(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/examples.Busi/CanSubmit", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *busiClient) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/examples.Busi/TransIn", in, out, opts...) @@ -74,6 +84,7 @@ func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest // All implementations must embed UnimplementedBusiServer // for forward compatibility type BusiServer interface { + CanSubmit(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransIn(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransOut(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) TransInRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) @@ -85,6 +96,9 @@ type BusiServer interface { type UnimplementedBusiServer struct { } +func (UnimplementedBusiServer) CanSubmit(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method CanSubmit not implemented") +} func (UnimplementedBusiServer) TransIn(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransIn not implemented") } @@ -110,6 +124,24 @@ func RegisterBusiServer(s grpc.ServiceRegistrar, srv BusiServer) { s.RegisterService(&Busi_ServiceDesc, srv) } +func _Busi_CanSubmit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(dtmgrpc.BusiRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).CanSubmit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/examples.Busi/CanSubmit", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).CanSubmit(ctx, req.(*dtmgrpc.BusiRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Busi_TransIn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(dtmgrpc.BusiRequest) if err := dec(in); err != nil { @@ -189,6 +221,10 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ ServiceName: "examples.Busi", HandlerType: (*BusiServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "CanSubmit", + Handler: _Busi_CanSubmit_Handler, + }, { MethodName: "TransIn", Handler: _Busi_TransIn_Handler, diff --git a/examples/main_base.go b/examples/main_base.go index 432f84d..43763a8 100644 --- a/examples/main_base.go +++ b/examples/main_base.go @@ -14,8 +14,8 @@ const ( BusiAPI = "/api/busi" // BusiPort busi server port BusiPort = 8081 - // BusiPbPort busi server port - BusiPbPort = 60081 + // BusiGrpcPort busi server port + BusiGrpcPort = 60081 ) // Busi busi service url prefix diff --git a/examples/main_grpc.go b/examples/main_grpc.go index 6963a29..0368c56 100644 --- a/examples/main_grpc.go +++ b/examples/main_grpc.go @@ -13,8 +13,8 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -// BusiPb busi service grpc address -var BusiPb string = fmt.Sprintf("localhost:%d", BusiPbPort) +// BusiGrpc busi service grpc address +var BusiGrpc string = fmt.Sprintf("localhost:%d", BusiGrpcPort) // DtmClient grpc client for dtm var DtmClient dtmgrpc.DtmClient = nil @@ -26,7 +26,7 @@ func GrpcStartup() { DtmClient = dtmgrpc.NewDtmClient(conn) dtmcli.Logf("dtm client inited") - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiPbPort)) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiGrpcPort)) dtmcli.FatalIfError(err) s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog)) RegisterBusiServer(s, &busiServer{}) @@ -54,6 +54,11 @@ type busiServer struct { UnimplementedBusiServer } +func (s *busiServer) CanSubmit(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { + res := MainSwitch.CanSubmitResult.Fetch() + return &emptypb.Empty{}, dtmgrpc.Result2Error(res, nil) +} + func (s *busiServer) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { req := TransReq{} dtmcli.MustUnmarshal(in.AppData, &req) diff --git a/examples/main_grpc_msg.go b/examples/main_grpc_msg.go index a9be131..a14ee45 100644 --- a/examples/main_grpc_msg.go +++ b/examples/main_grpc_msg.go @@ -15,8 +15,8 @@ func MsgGrpcSetup(app *gin.Engine) { func MsgGrpcFireRequest() string { req := dtmcli.MustMarshal(&TransReq{Amount: 30}) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, dtmcli.MustGenGid(DtmServer)). - Add(BusiPb+"/examples.Busi/TransOut", req). - Add(BusiPb+"/examples.Busi/TransIn", req) + Add(BusiGrpc+"/examples.Busi/TransOut", req). + Add(BusiGrpc+"/examples.Busi/TransIn", req) err := msg.Submit() e2p(err) return msg.Gid