diff --git a/app/main.go b/app/main.go index 2cd44f6..ec7d066 100644 --- a/app/main.go +++ b/app/main.go @@ -41,6 +41,7 @@ func main() { // 下面是各类的例子 app := examples.BaseAppStartup() + examples.GrpcStartup() if os.Args[1] == "xa" { // 启动xa示例 examples.XaSetup(app) examples.XaFireRequest() @@ -54,8 +55,8 @@ func main() { examples.MsgSetup(app) examples.MsgFireRequest() } else if os.Args[1] == "msg_pb" { // 启动msg示例 - examples.MsgPbSetup(app) - examples.MsgPbFireRequest() + examples.MsgGrpcSetup(app) + examples.MsgGrpcFireRequest() } else if os.Args[1] == "all" { // 运行所有示例 examples.SagaSetup(app) examples.SagaWaitSetup(app) diff --git a/dtmpb/dtmpb.pb.go b/dtmgrpc/dtmgrpc.pb.go similarity index 53% rename from dtmpb/dtmpb.pb.go rename to dtmgrpc/dtmgrpc.pb.go index 0a270e5..11ad1b3 100644 --- a/dtmpb/dtmpb.pb.go +++ b/dtmgrpc/dtmgrpc.pb.go @@ -2,9 +2,9 @@ // versions: // protoc-gen-go v1.26.0 // protoc v3.17.3 -// source: dtmpb/dtmpb.proto +// source: dtmgrpc/dtmgrpc.proto -package dtmpb +package dtmgrpc import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -36,7 +36,7 @@ type DtmTransInfo struct { func (x *DtmTransInfo) Reset() { *x = DtmTransInfo{} if protoimpl.UnsafeEnabled { - mi := &file_dtmpb_dtmpb_proto_msgTypes[0] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -49,7 +49,7 @@ func (x *DtmTransInfo) String() string { func (*DtmTransInfo) ProtoMessage() {} func (x *DtmTransInfo) ProtoReflect() protoreflect.Message { - mi := &file_dtmpb_dtmpb_proto_msgTypes[0] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -62,7 +62,7 @@ func (x *DtmTransInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use DtmTransInfo.ProtoReflect.Descriptor instead. func (*DtmTransInfo) Descriptor() ([]byte, []int) { - return file_dtmpb_dtmpb_proto_rawDescGZIP(), []int{0} + return file_dtmgrpc_dtmgrpc_proto_rawDescGZIP(), []int{0} } func (x *DtmTransInfo) GetGid() string { @@ -117,7 +117,7 @@ type DtmRequest struct { func (x *DtmRequest) Reset() { *x = DtmRequest{} if protoimpl.UnsafeEnabled { - mi := &file_dtmpb_dtmpb_proto_msgTypes[1] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -130,7 +130,7 @@ func (x *DtmRequest) String() string { func (*DtmRequest) ProtoMessage() {} func (x *DtmRequest) ProtoReflect() protoreflect.Message { - mi := &file_dtmpb_dtmpb_proto_msgTypes[1] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -143,7 +143,7 @@ func (x *DtmRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use DtmRequest.ProtoReflect.Descriptor instead. func (*DtmRequest) Descriptor() ([]byte, []int) { - return file_dtmpb_dtmpb_proto_rawDescGZIP(), []int{1} + return file_dtmgrpc_dtmgrpc_proto_rawDescGZIP(), []int{1} } func (x *DtmRequest) GetGid() string { @@ -201,7 +201,7 @@ type DtmReply struct { func (x *DtmReply) Reset() { *x = DtmReply{} if protoimpl.UnsafeEnabled { - mi := &file_dtmpb_dtmpb_proto_msgTypes[2] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -214,7 +214,7 @@ func (x *DtmReply) String() string { func (*DtmReply) ProtoMessage() {} func (x *DtmReply) ProtoReflect() protoreflect.Message { - mi := &file_dtmpb_dtmpb_proto_msgTypes[2] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -227,7 +227,7 @@ func (x *DtmReply) ProtoReflect() protoreflect.Message { // Deprecated: Use DtmReply.ProtoReflect.Descriptor instead. func (*DtmReply) Descriptor() ([]byte, []int) { - return file_dtmpb_dtmpb_proto_rawDescGZIP(), []int{2} + return file_dtmgrpc_dtmgrpc_proto_rawDescGZIP(), []int{2} } func (x *DtmReply) GetDtmResult() string { @@ -258,7 +258,7 @@ type BusiRequest struct { func (x *BusiRequest) Reset() { *x = BusiRequest{} if protoimpl.UnsafeEnabled { - mi := &file_dtmpb_dtmpb_proto_msgTypes[3] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -271,7 +271,7 @@ func (x *BusiRequest) String() string { func (*BusiRequest) ProtoMessage() {} func (x *BusiRequest) ProtoReflect() protoreflect.Message { - mi := &file_dtmpb_dtmpb_proto_msgTypes[3] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -284,7 +284,7 @@ func (x *BusiRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BusiRequest.ProtoReflect.Descriptor instead. func (*BusiRequest) Descriptor() ([]byte, []int) { - return file_dtmpb_dtmpb_proto_rawDescGZIP(), []int{3} + return file_dtmgrpc_dtmgrpc_proto_rawDescGZIP(), []int{3} } func (x *BusiRequest) GetInfo() *DtmTransInfo { @@ -321,7 +321,7 @@ type BusiReply struct { func (x *BusiReply) Reset() { *x = BusiReply{} if protoimpl.UnsafeEnabled { - mi := &file_dtmpb_dtmpb_proto_msgTypes[4] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -334,7 +334,7 @@ func (x *BusiReply) String() string { func (*BusiReply) ProtoMessage() {} func (x *BusiReply) ProtoReflect() protoreflect.Message { - mi := &file_dtmpb_dtmpb_proto_msgTypes[4] + mi := &file_dtmgrpc_dtmgrpc_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -347,7 +347,7 @@ func (x *BusiReply) ProtoReflect() protoreflect.Message { // Deprecated: Use BusiReply.ProtoReflect.Descriptor instead. func (*BusiReply) Descriptor() ([]byte, []int) { - return file_dtmpb_dtmpb_proto_rawDescGZIP(), []int{4} + return file_dtmgrpc_dtmgrpc_proto_rawDescGZIP(), []int{4} } func (x *BusiReply) GetDtmResult() string { @@ -364,97 +364,98 @@ func (x *BusiReply) GetDtmMessage() string { return "" } -var File_dtmpb_dtmpb_proto protoreflect.FileDescriptor +var File_dtmgrpc_dtmgrpc_proto protoreflect.FileDescriptor -var file_dtmpb_dtmpb_proto_rawDesc = []byte{ - 0x0a, 0x11, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2f, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8c, 0x01, 0x0a, 0x0c, 0x44, 0x74, 0x6d, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, - 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, - 0x63, 0x68, 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x54, 0x79, - 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x03, 0x44, 0x74, 0x6d, 0x22, 0x84, 0x02, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, - 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, - 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x57, - 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x32, 0x0a, 0x05, 0x45, - 0x78, 0x74, 0x72, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x64, 0x74, 0x6d, - 0x70, 0x62, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, - 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x12, - 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x44, - 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x48, 0x0a, - 0x08, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x74, 0x6d, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, 0x74, - 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, 0x6d, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xbf, 0x01, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x44, 0x74, - 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x69, 0x6e, 0x66, 0x6f, - 0x12, 0x33, 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x1d, 0x2e, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, - 0x45, 0x78, 0x74, 0x72, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x41, 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x41, 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x1a, - 0x38, 0x0a, 0x0a, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, - 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, - 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x49, 0x0a, 0x09, 0x42, 0x75, 0x73, - 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, - 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x32, 0x3c, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x35, 0x0a, 0x06, 0x53, - 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x11, 0x2e, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x2e, 0x44, 0x74, - 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x19, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x62, +var file_dtmgrpc_dtmgrpc_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, + 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8c, 0x01, + 0x0a, 0x0c, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, + 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, + 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, + 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x42, 0x72, + 0x61, 0x6e, 0x63, 0x68, 0x54, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, + 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x54, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x44, 0x74, + 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x44, 0x74, 0x6d, 0x22, 0x86, 0x02, 0x0a, + 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, + 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x51, + 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, + 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x12, 0x34, 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1e, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x52, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, + 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x48, 0x0a, 0x08, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, + 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0xc3, 0x01, 0x0a, 0x0b, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x29, 0x0a, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, + 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x12, 0x35, 0x0a, 0x05, 0x45, 0x78, + 0x74, 0x72, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x64, 0x74, 0x6d, 0x67, + 0x72, 0x70, 0x63, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x45, 0x78, 0x74, 0x72, + 0x61, 0x12, 0x18, 0x0a, 0x07, 0x41, 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x41, 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, + 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x49, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x32, 0x3e, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, + 0x74, 0x12, 0x13, 0x2e, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x74, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, + 0x42, 0x1d, 0x5a, 0x1b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, + 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_dtmpb_dtmpb_proto_rawDescOnce sync.Once - file_dtmpb_dtmpb_proto_rawDescData = file_dtmpb_dtmpb_proto_rawDesc + file_dtmgrpc_dtmgrpc_proto_rawDescOnce sync.Once + file_dtmgrpc_dtmgrpc_proto_rawDescData = file_dtmgrpc_dtmgrpc_proto_rawDesc ) -func file_dtmpb_dtmpb_proto_rawDescGZIP() []byte { - file_dtmpb_dtmpb_proto_rawDescOnce.Do(func() { - file_dtmpb_dtmpb_proto_rawDescData = protoimpl.X.CompressGZIP(file_dtmpb_dtmpb_proto_rawDescData) +func file_dtmgrpc_dtmgrpc_proto_rawDescGZIP() []byte { + file_dtmgrpc_dtmgrpc_proto_rawDescOnce.Do(func() { + file_dtmgrpc_dtmgrpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_dtmgrpc_dtmgrpc_proto_rawDescData) }) - return file_dtmpb_dtmpb_proto_rawDescData + return file_dtmgrpc_dtmgrpc_proto_rawDescData } -var file_dtmpb_dtmpb_proto_msgTypes = make([]protoimpl.MessageInfo, 7) -var file_dtmpb_dtmpb_proto_goTypes = []interface{}{ - (*DtmTransInfo)(nil), // 0: dtmpb.DtmTransInfo - (*DtmRequest)(nil), // 1: dtmpb.DtmRequest - (*DtmReply)(nil), // 2: dtmpb.DtmReply - (*BusiRequest)(nil), // 3: dtmpb.BusiRequest - (*BusiReply)(nil), // 4: dtmpb.BusiReply - nil, // 5: dtmpb.DtmRequest.ExtraEntry - nil, // 6: dtmpb.BusiRequest.ExtraEntry +var file_dtmgrpc_dtmgrpc_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_dtmgrpc_dtmgrpc_proto_goTypes = []interface{}{ + (*DtmTransInfo)(nil), // 0: dtmgrpc.DtmTransInfo + (*DtmRequest)(nil), // 1: dtmgrpc.DtmRequest + (*DtmReply)(nil), // 2: dtmgrpc.DtmReply + (*BusiRequest)(nil), // 3: dtmgrpc.BusiRequest + (*BusiReply)(nil), // 4: dtmgrpc.BusiReply + nil, // 5: dtmgrpc.DtmRequest.ExtraEntry + nil, // 6: dtmgrpc.BusiRequest.ExtraEntry (*emptypb.Empty)(nil), // 7: google.protobuf.Empty } -var file_dtmpb_dtmpb_proto_depIdxs = []int32{ - 5, // 0: dtmpb.DtmRequest.Extra:type_name -> dtmpb.DtmRequest.ExtraEntry - 0, // 1: dtmpb.BusiRequest.info:type_name -> dtmpb.DtmTransInfo - 6, // 2: dtmpb.BusiRequest.Extra:type_name -> dtmpb.BusiRequest.ExtraEntry - 1, // 3: dtmpb.Dtm.Submit:input_type -> dtmpb.DtmRequest - 7, // 4: dtmpb.Dtm.Submit:output_type -> google.protobuf.Empty +var file_dtmgrpc_dtmgrpc_proto_depIdxs = []int32{ + 5, // 0: dtmgrpc.DtmRequest.Extra:type_name -> dtmgrpc.DtmRequest.ExtraEntry + 0, // 1: dtmgrpc.BusiRequest.info:type_name -> dtmgrpc.DtmTransInfo + 6, // 2: dtmgrpc.BusiRequest.Extra:type_name -> dtmgrpc.BusiRequest.ExtraEntry + 1, // 3: dtmgrpc.Dtm.Submit:input_type -> dtmgrpc.DtmRequest + 7, // 4: dtmgrpc.Dtm.Submit:output_type -> google.protobuf.Empty 4, // [4:5] is the sub-list for method output_type 3, // [3:4] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name @@ -462,13 +463,13 @@ var file_dtmpb_dtmpb_proto_depIdxs = []int32{ 0, // [0:3] is the sub-list for field type_name } -func init() { file_dtmpb_dtmpb_proto_init() } -func file_dtmpb_dtmpb_proto_init() { - if File_dtmpb_dtmpb_proto != nil { +func init() { file_dtmgrpc_dtmgrpc_proto_init() } +func file_dtmgrpc_dtmgrpc_proto_init() { + if File_dtmgrpc_dtmgrpc_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_dtmpb_dtmpb_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_dtmgrpc_dtmgrpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DtmTransInfo); i { case 0: return &v.state @@ -480,7 +481,7 @@ func file_dtmpb_dtmpb_proto_init() { return nil } } - file_dtmpb_dtmpb_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_dtmgrpc_dtmgrpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DtmRequest); i { case 0: return &v.state @@ -492,7 +493,7 @@ func file_dtmpb_dtmpb_proto_init() { return nil } } - file_dtmpb_dtmpb_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_dtmgrpc_dtmgrpc_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DtmReply); i { case 0: return &v.state @@ -504,7 +505,7 @@ func file_dtmpb_dtmpb_proto_init() { return nil } } - file_dtmpb_dtmpb_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_dtmgrpc_dtmgrpc_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*BusiRequest); i { case 0: return &v.state @@ -516,7 +517,7 @@ func file_dtmpb_dtmpb_proto_init() { return nil } } - file_dtmpb_dtmpb_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_dtmgrpc_dtmgrpc_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*BusiReply); i { case 0: return &v.state @@ -533,18 +534,18 @@ func file_dtmpb_dtmpb_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_dtmpb_dtmpb_proto_rawDesc, + RawDescriptor: file_dtmgrpc_dtmgrpc_proto_rawDesc, NumEnums: 0, NumMessages: 7, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_dtmpb_dtmpb_proto_goTypes, - DependencyIndexes: file_dtmpb_dtmpb_proto_depIdxs, - MessageInfos: file_dtmpb_dtmpb_proto_msgTypes, + GoTypes: file_dtmgrpc_dtmgrpc_proto_goTypes, + DependencyIndexes: file_dtmgrpc_dtmgrpc_proto_depIdxs, + MessageInfos: file_dtmgrpc_dtmgrpc_proto_msgTypes, }.Build() - File_dtmpb_dtmpb_proto = out.File - file_dtmpb_dtmpb_proto_rawDesc = nil - file_dtmpb_dtmpb_proto_goTypes = nil - file_dtmpb_dtmpb_proto_depIdxs = nil + File_dtmgrpc_dtmgrpc_proto = out.File + file_dtmgrpc_dtmgrpc_proto_rawDesc = nil + file_dtmgrpc_dtmgrpc_proto_goTypes = nil + file_dtmgrpc_dtmgrpc_proto_depIdxs = nil } diff --git a/dtmpb/dtmpb.proto b/dtmgrpc/dtmgrpc.proto similarity index 93% rename from dtmpb/dtmpb.proto rename to dtmgrpc/dtmgrpc.proto index 6bd4ad1..7931fc0 100644 --- a/dtmpb/dtmpb.proto +++ b/dtmgrpc/dtmgrpc.proto @@ -1,9 +1,9 @@ syntax = "proto3"; -option go_package = "github.com/yedf/dtm/dtmpb"; +option go_package = "github.com/yedf/dtm/dtmgrpc"; import "google/protobuf/empty.proto"; -package dtmpb; +package dtmgrpc; // The dtm service definition. service Dtm { diff --git a/dtmpb/dtmpb_grpc.pb.go b/dtmgrpc/dtmgrpc_grpc.pb.go similarity index 94% rename from dtmpb/dtmpb_grpc.pb.go rename to dtmgrpc/dtmgrpc_grpc.pb.go index 3c85437..e6a8d8e 100644 --- a/dtmpb/dtmpb_grpc.pb.go +++ b/dtmgrpc/dtmgrpc_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. -package dtmpb +package dtmgrpc import ( context "context" @@ -32,7 +32,7 @@ func NewDtmClient(cc grpc.ClientConnInterface) DtmClient { func (c *dtmClient) Submit(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, "/dtmpb.Dtm/Submit", in, out, opts...) + err := c.cc.Invoke(ctx, "/dtmgrpc.Dtm/Submit", in, out, opts...) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func _Dtm_Submit_Handler(srv interface{}, ctx context.Context, dec func(interfac } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/dtmpb.Dtm/Submit", + FullMethod: "/dtmgrpc.Dtm/Submit", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(DtmServer).Submit(ctx, req.(*DtmRequest)) @@ -89,7 +89,7 @@ func _Dtm_Submit_Handler(srv interface{}, ctx context.Context, dec func(interfac // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Dtm_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "dtmpb.Dtm", + ServiceName: "dtmgrpc.Dtm", HandlerType: (*DtmServer)(nil), Methods: []grpc.MethodDesc{ { @@ -98,5 +98,5 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{}, - Metadata: "dtmpb/dtmpb.proto", + Metadata: "dtmgrpc/dtmgrpc.proto", } diff --git a/dtmpb/message.go b/dtmgrpc/message.go similarity index 98% rename from dtmpb/message.go rename to dtmgrpc/message.go index 70c9294..7656077 100644 --- a/dtmpb/message.go +++ b/dtmgrpc/message.go @@ -1,4 +1,4 @@ -package dtmpb +package dtmgrpc import ( "context" diff --git a/dtmpb/type.go b/dtmgrpc/type.go similarity index 99% rename from dtmpb/type.go rename to dtmgrpc/type.go index be66360..54d8f76 100644 --- a/dtmpb/type.go +++ b/dtmgrpc/type.go @@ -1,4 +1,4 @@ -package dtmpb +package dtmgrpc import ( context "context" diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index 9b08ef0..9c63bd6 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -3,7 +3,7 @@ package dtmsvr import ( "context" - pb "github.com/yedf/dtm/dtmpb" + pb "github.com/yedf/dtm/dtmgrpc" "google.golang.org/protobuf/types/known/emptypb" ) diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 44fd50e..5c74b26 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -47,6 +47,7 @@ func TestMain(m *testing.M) { // 启动组件 go StartSvr() app = examples.BaseAppStartup() + examples.GrpcStartup() examples.SagaSetup(app) examples.TccSetup(app) examples.XaSetup(app) diff --git a/dtmsvr/examples_test.go b/dtmsvr/examples_test.go index 0ed7b6c..e31d33e 100644 --- a/dtmsvr/examples_test.go +++ b/dtmsvr/examples_test.go @@ -18,5 +18,5 @@ func TestExamples(t *testing.T) { assertSucceed(t, examples.TccFireRequest()) assertSucceed(t, examples.TccFireRequestNested()) assertSucceed(t, examples.XaFireRequest()) - assertSucceed(t, examples.MsgPbFireRequest()) + assertSucceed(t, examples.MsgGrpcFireRequest()) } diff --git a/dtmsvr/main.go b/dtmsvr/main.go index c95822d..55b680b 100644 --- a/dtmsvr/main.go +++ b/dtmsvr/main.go @@ -7,7 +7,7 @@ import ( "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmpb" + "github.com/yedf/dtm/dtmgrpc" "github.com/yedf/dtm/examples" "google.golang.org/grpc" @@ -26,8 +26,8 @@ func StartSvr() { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", dtmsvrGrpcPort)) dtmcli.FatalIfError(err) - s := grpc.NewServer(grpc.UnaryInterceptor(dtmpb.GrpcServerLog)) - dtmpb.RegisterDtmServer(s, &dtmServer{}) + s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog)) + dtmgrpc.RegisterDtmServer(s, &dtmServer{}) dtmcli.Logf("grpc listening at %v", lis.Addr()) go func() { err := s.Serve(lis) diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 0077e10..fe251e7 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -9,7 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - "github.com/yedf/dtm/dtmpb" + "github.com/yedf/dtm/dtmgrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -160,10 +160,10 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string { func (t *TransGlobal) getBranchResult(branch *TransBranch) string { if t.Protocol == "grpc" { - server, method := dtmpb.GetServerAndMethod(branch.URL) - conn := dtmpb.MustGetGrpcConn(server) - err := conn.Invoke(context.Background(), method, &dtmpb.BusiRequest{ - Info: &dtmpb.DtmTransInfo{ + server, method := dtmgrpc.GetServerAndMethod(branch.URL) + conn := dtmgrpc.MustGetGrpcConn(server) + err := conn.Invoke(context.Background(), method, &dtmgrpc.BusiRequest{ + Info: &dtmgrpc.DtmTransInfo{ Gid: t.Gid, TransType: t.TransType, BranchID: branch.BranchID, @@ -245,7 +245,7 @@ func TransFromContext(c *gin.Context) *TransGlobal { } // TransFromDtmRequest TransFromContext -func TransFromDtmRequest(c *dtmpb.DtmRequest) *TransGlobal { +func TransFromDtmRequest(c *dtmgrpc.DtmRequest) *TransGlobal { return &TransGlobal{ Gid: c.Gid, TransType: c.TransType, diff --git a/dtmsvr/trans_grpc_msg_test.go b/dtmsvr/trans_grpc_msg_test.go new file mode 100644 index 0000000..4273398 --- /dev/null +++ b/dtmsvr/trans_grpc_msg_test.go @@ -0,0 +1,43 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmgrpc" + "github.com/yedf/dtm/examples" +) + +func TestGrpcMsg(t *testing.T) { + grpcMsgNormal(t) + grpcMsgPending(t) +} + +func grpcMsgNormal(t *testing.T) { + msg := genGrpcMsg("grpc-msg-normal") + err := msg.Submit() + assert.Nil(t, err) + WaitTransProcessed(msg.Gid) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + +func grpcMsgPending(t *testing.T) { + msg := genGrpcMsg("grpc-msg-pending") + examples.MainSwitch.TransInResult.SetOnce("PENDING") + err := msg.Submit() + assert.Nil(t, err) + WaitTransProcessed(msg.Gid) + assert.Equal(t, "submitted", getTransStatus(msg.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, "succeed", getTransStatus(msg.Gid)) +} + +func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc { + req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30}) + return dtmgrpc.NewMsgGrpc(examples.DtmGrpcServer, gid). + Add(examples.BusiPb+"/examples.Busi/TransOut", req). + Add(examples.BusiPb+"/examples.Busi/TransIn", req) + +} diff --git a/dtmsvr/trans_pb_msg_test.go b/dtmsvr/trans_pb_msg_test.go deleted file mode 100644 index d21de33..0000000 --- a/dtmsvr/trans_pb_msg_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package dtmsvr - -import ( - "testing" -) - -func TestPbMsg(t *testing.T) { -} diff --git a/examples/busi.pb.go b/examples/busi.pb.go index 0aea2a2..3b427db 100644 --- a/examples/busi.pb.go +++ b/examples/busi.pb.go @@ -7,7 +7,7 @@ package examples import ( - dtmpb "github.com/yedf/dtm/dtmpb" + dtmgrpc "github.com/yedf/dtm/dtmgrpc" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -51,14 +51,14 @@ var file_examples_busi_proto_rawDesc = []byte{ } var file_examples_busi_proto_goTypes = []interface{}{ - (*dtmpb.BusiRequest)(nil), // 0: dtmpb.BusiRequest - (*emptypb.Empty)(nil), // 1: google.protobuf.Empty + (*dtmgrpc.BusiRequest)(nil), // 0: dtmgrpc.BusiRequest + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty } var file_examples_busi_proto_depIdxs = []int32{ - 0, // 0: examples.Busi.TransIn:input_type -> dtmpb.BusiRequest - 0, // 1: examples.Busi.TransOut:input_type -> dtmpb.BusiRequest - 0, // 2: examples.Busi.TransInRevert:input_type -> dtmpb.BusiRequest - 0, // 3: examples.Busi.TransOutRevert:input_type -> dtmpb.BusiRequest + 0, // 0: examples.Busi.TransIn:input_type -> dtmgrpc.BusiRequest + 0, // 1: examples.Busi.TransOut:input_type -> dtmgrpc.BusiRequest + 0, // 2: examples.Busi.TransInRevert:input_type -> dtmgrpc.BusiRequest + 0, // 3: examples.Busi.TransOutRevert:input_type -> dtmgrpc.BusiRequest 1, // 4: examples.Busi.TransIn:output_type -> google.protobuf.Empty 1, // 5: examples.Busi.TransOut:output_type -> google.protobuf.Empty 1, // 6: examples.Busi.TransInRevert:output_type -> google.protobuf.Empty diff --git a/examples/busi.proto b/examples/busi.proto index 76146b3..402bb6b 100644 --- a/examples/busi.proto +++ b/examples/busi.proto @@ -3,14 +3,14 @@ syntax = "proto3"; package examples; option go_package = "github.com/yedf/dtm/examples"; -import "dtmpb/dtmpb.proto"; +import "dtmgrpc/dtmgrpc.proto"; import "google/protobuf/empty.proto"; // The dtm service definition. service Busi { - rpc TransIn(dtmpb.BusiRequest) returns (google.protobuf.Empty) {} - rpc TransOut(dtmpb.BusiRequest) returns (google.protobuf.Empty) {} - rpc TransInRevert(dtmpb.BusiRequest) returns (google.protobuf.Empty) {} - rpc TransOutRevert(dtmpb.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransIn(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransOut(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransInRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} + rpc TransOutRevert(dtmgrpc.BusiRequest) returns (google.protobuf.Empty) {} } diff --git a/examples/busi_grpc.pb.go b/examples/busi_grpc.pb.go index 0ddc227..fbac28c 100644 --- a/examples/busi_grpc.pb.go +++ b/examples/busi_grpc.pb.go @@ -4,7 +4,7 @@ package examples import ( context "context" - dtmpb "github.com/yedf/dtm/dtmpb" + dtmgrpc "github.com/yedf/dtm/dtmgrpc" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -20,10 +20,10 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type BusiClient interface { - TransIn(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) - TransOut(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) - TransInRevert(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) - TransOutRevert(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransIn(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOut(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -34,7 +34,7 @@ func NewBusiClient(cc grpc.ClientConnInterface) BusiClient { return &busiClient{cc} } -func (c *busiClient) TransIn(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *busiClient) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/examples.Busi/TransIn", in, out, opts...) if err != nil { @@ -43,7 +43,7 @@ func (c *busiClient) TransIn(ctx context.Context, in *dtmpb.BusiRequest, opts .. return out, nil } -func (c *busiClient) TransOut(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *busiClient) TransOut(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/examples.Busi/TransOut", in, out, opts...) if err != nil { @@ -52,7 +52,7 @@ func (c *busiClient) TransOut(ctx context.Context, in *dtmpb.BusiRequest, opts . return out, nil } -func (c *busiClient) TransInRevert(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *busiClient) TransInRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/examples.Busi/TransInRevert", in, out, opts...) if err != nil { @@ -61,7 +61,7 @@ func (c *busiClient) TransInRevert(ctx context.Context, in *dtmpb.BusiRequest, o return out, nil } -func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmpb.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmgrpc.BusiRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, "/examples.Busi/TransOutRevert", in, out, opts...) if err != nil { @@ -74,10 +74,10 @@ func (c *busiClient) TransOutRevert(ctx context.Context, in *dtmpb.BusiRequest, // All implementations must embed UnimplementedBusiServer // for forward compatibility type BusiServer interface { - TransIn(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) - TransOut(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) - TransInRevert(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) - TransOutRevert(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) + TransIn(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransOut(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransInRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) + TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -85,16 +85,16 @@ type BusiServer interface { type UnimplementedBusiServer struct { } -func (UnimplementedBusiServer) TransIn(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (UnimplementedBusiServer) TransIn(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransIn not implemented") } -func (UnimplementedBusiServer) TransOut(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (UnimplementedBusiServer) TransOut(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOut not implemented") } -func (UnimplementedBusiServer) TransInRevert(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (UnimplementedBusiServer) TransInRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransInRevert not implemented") } -func (UnimplementedBusiServer) TransOutRevert(context.Context, *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (UnimplementedBusiServer) TransOutRevert(context.Context, *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutRevert not implemented") } func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} @@ -111,7 +111,7 @@ func RegisterBusiServer(s grpc.ServiceRegistrar, srv BusiServer) { } func _Busi_TransIn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(dtmpb.BusiRequest) + in := new(dtmgrpc.BusiRequest) if err := dec(in); err != nil { return nil, err } @@ -123,13 +123,13 @@ func _Busi_TransIn_Handler(srv interface{}, ctx context.Context, dec func(interf FullMethod: "/examples.Busi/TransIn", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BusiServer).TransIn(ctx, req.(*dtmpb.BusiRequest)) + return srv.(BusiServer).TransIn(ctx, req.(*dtmgrpc.BusiRequest)) } return interceptor(ctx, in, info, handler) } func _Busi_TransOut_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(dtmpb.BusiRequest) + in := new(dtmgrpc.BusiRequest) if err := dec(in); err != nil { return nil, err } @@ -141,13 +141,13 @@ func _Busi_TransOut_Handler(srv interface{}, ctx context.Context, dec func(inter FullMethod: "/examples.Busi/TransOut", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BusiServer).TransOut(ctx, req.(*dtmpb.BusiRequest)) + return srv.(BusiServer).TransOut(ctx, req.(*dtmgrpc.BusiRequest)) } return interceptor(ctx, in, info, handler) } func _Busi_TransInRevert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(dtmpb.BusiRequest) + in := new(dtmgrpc.BusiRequest) if err := dec(in); err != nil { return nil, err } @@ -159,13 +159,13 @@ func _Busi_TransInRevert_Handler(srv interface{}, ctx context.Context, dec func( FullMethod: "/examples.Busi/TransInRevert", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BusiServer).TransInRevert(ctx, req.(*dtmpb.BusiRequest)) + return srv.(BusiServer).TransInRevert(ctx, req.(*dtmgrpc.BusiRequest)) } return interceptor(ctx, in, info, handler) } func _Busi_TransOutRevert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(dtmpb.BusiRequest) + in := new(dtmgrpc.BusiRequest) if err := dec(in); err != nil { return nil, err } @@ -177,7 +177,7 @@ func _Busi_TransOutRevert_Handler(srv interface{}, ctx context.Context, dec func FullMethod: "/examples.Busi/TransOutRevert", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BusiServer).TransOutRevert(ctx, req.(*dtmpb.BusiRequest)) + return srv.(BusiServer).TransOutRevert(ctx, req.(*dtmgrpc.BusiRequest)) } return interceptor(ctx, in, info, handler) } diff --git a/examples/main_base.go b/examples/main_base.go index 533b837..432f84d 100644 --- a/examples/main_base.go +++ b/examples/main_base.go @@ -2,14 +2,11 @@ package examples import ( "fmt" - "net" "time" "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" - dtmpb "github.com/yedf/dtm/dtmpb" - grpc "google.golang.org/grpc" ) const ( @@ -24,12 +21,6 @@ const ( // Busi busi service url prefix var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) -// BusiPb busi service grpc address -var BusiPb string = fmt.Sprintf("localhost:%d", BusiPbPort) - -// DtmClient grpc client for dtm -var DtmClient dtmpb.DtmClient = nil - // BaseAppStartup base app startup func BaseAppStartup() *gin.Engine { dtmcli.Logf("examples starting") @@ -38,21 +29,6 @@ func BaseAppStartup() *gin.Engine { dtmcli.Logf("Starting busi at: %d", BusiPort) go app.Run(fmt.Sprintf(":%d", BusiPort)) - conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(dtmpb.GrpcClientLog)) - dtmcli.FatalIfError(err) - DtmClient = dtmpb.NewDtmClient(conn) - dtmcli.Logf("dtm client inited") - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiPbPort)) - dtmcli.FatalIfError(err) - s := grpc.NewServer(grpc.UnaryInterceptor(dtmpb.GrpcServerLog)) - RegisterBusiServer(s, &busiServer{}) - dtmcli.Logf("busi grpc listening at %v", lis.Addr()) - go func() { - err := s.Serve(lis) - dtmcli.FatalIfError(err) - }() - time.Sleep(100 * time.Millisecond) return app } @@ -92,7 +68,6 @@ func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi res := dtmcli.OrString(result1, result2, "SUCCESS") dtmcli.Logf("%s %s result: %s", busi, info.String(), res) return M{"dtm_result": res}, nil - } // BaseAddRoute add base route handler diff --git a/examples/main_grpc.go b/examples/main_grpc.go index d3d908a..6963a29 100644 --- a/examples/main_grpc.go +++ b/examples/main_grpc.go @@ -2,27 +2,66 @@ package examples import ( "context" + "fmt" + "net" "github.com/yedf/dtm/dtmcli" - dtmpb "github.com/yedf/dtm/dtmpb" + dtmgrpc "github.com/yedf/dtm/dtmgrpc" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" ) +// BusiPb busi service grpc address +var BusiPb string = fmt.Sprintf("localhost:%d", BusiPbPort) + +// DtmClient grpc client for dtm +var DtmClient dtmgrpc.DtmClient = nil + +// GrpcStartup for grpc +func GrpcStartup() { + conn, err := grpc.Dial(DtmGrpcServer, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(dtmgrpc.GrpcClientLog)) + dtmcli.FatalIfError(err) + DtmClient = dtmgrpc.NewDtmClient(conn) + dtmcli.Logf("dtm client inited") + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", BusiPbPort)) + dtmcli.FatalIfError(err) + s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog)) + RegisterBusiServer(s, &busiServer{}) + dtmcli.Logf("busi grpc listening at %v", lis.Addr()) + go func() { + err := s.Serve(lis) + dtmcli.FatalIfError(err) + }() +} + +func handleGrpcBusiness(in *dtmgrpc.BusiRequest, result1 string, result2 string, busi string) error { + res := dtmcli.OrString(result1, result2, "SUCCESS") + dtmcli.Logf("grpc busi %s %s result: %s", busi, in.Info, res) + if res == "SUCCESS" { + return nil + } else if res == "FAILURE" { + return status.New(codes.Aborted, "user want to rollback").Err() + } + return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err() + +} + // busiServer is used to implement helloworld.GreeterServer. type busiServer struct { UnimplementedBusiServer } -func (s *busiServer) TransIn(ctx context.Context, in *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (s *busiServer) TransIn(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { req := TransReq{} dtmcli.MustUnmarshal(in.AppData, &req) - dtmcli.Logf("busiServer %s received: %v %v", dtmcli.GetFuncName(), in.Info, req) - return &emptypb.Empty{}, nil + return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransInResult, MainSwitch.TransInResult.Fetch(), "TransIn") } -func (s *busiServer) TransOut(ctx context.Context, in *dtmpb.BusiRequest) (*emptypb.Empty, error) { +func (s *busiServer) TransOut(ctx context.Context, in *dtmgrpc.BusiRequest) (*emptypb.Empty, error) { req := TransReq{} dtmcli.MustUnmarshal(in.AppData, &req) - dtmcli.Logf("busiServer %s received: %v %v", dtmcli.GetFuncName(), in.Info, req) - return &emptypb.Empty{}, nil + return &emptypb.Empty{}, handleGrpcBusiness(in, req.TransOutResult, MainSwitch.TransOutResult.Fetch(), "TransOut") } diff --git a/examples/main_msg_pb.go b/examples/main_grpc_msg.go similarity index 55% rename from examples/main_msg_pb.go rename to examples/main_grpc_msg.go index dba5036..a9be131 100644 --- a/examples/main_msg_pb.go +++ b/examples/main_grpc_msg.go @@ -3,18 +3,18 @@ package examples import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/dtmcli" - dtmpb "github.com/yedf/dtm/dtmpb" + dtmgrpc "github.com/yedf/dtm/dtmgrpc" ) -// MsgPbSetup 1 -func MsgPbSetup(app *gin.Engine) { +// MsgGrpcSetup 1 +func MsgGrpcSetup(app *gin.Engine) { } -// MsgPbFireRequest 1 -func MsgPbFireRequest() string { +// MsgGrpcFireRequest 1 +func MsgGrpcFireRequest() string { req := dtmcli.MustMarshal(&TransReq{Amount: 30}) - msg := dtmpb.NewMsgGrpc(DtmGrpcServer, dtmcli.MustGenGid(DtmServer)). + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, dtmcli.MustGenGid(DtmServer)). Add(BusiPb+"/examples.Busi/TransOut", req). Add(BusiPb+"/examples.Busi/TransIn", req) err := msg.Submit()