From cab5f3e38c67e7a0b042499e6fcc0ffd7fda67ba Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Mon, 9 Aug 2021 17:40:13 +0800 Subject: [PATCH] partial grpc --- dtmpb/dtmpb.pb.go | 97 ++++++++++++++--------------- dtmpb/dtmpb.proto | 7 ++- dtmpb/dtmpb_grpc.pb.go | 36 +++++++++++ dtmpb/{msg.go => message.go} | 19 ++++-- dtmpb/type.go | 28 +++++++++ dtmsvr/api.go | 116 ++--------------------------------- dtmsvr/api_grpc.go | 9 ++- dtmsvr/api_http.go | 115 ++++++++++++++++++++++++++++++++++ dtmsvr/dtmsvr.mysql.sql | 3 +- dtmsvr/trans.go | 16 +++++ dtmsvr/trans_xa.go | 6 +- examples/main_grpc.go | 19 +++++- examples/main_msg_pb.go | 26 ++++---- examples/main_xa.go | 2 +- 14 files changed, 305 insertions(+), 194 deletions(-) rename dtmpb/{msg.go => message.go} (67%) create mode 100644 dtmpb/type.go create mode 100644 dtmsvr/api_http.go diff --git a/dtmpb/dtmpb.pb.go b/dtmpb/dtmpb.pb.go index 3c97a2d..d678c42 100644 --- a/dtmpb/dtmpb.pb.go +++ b/dtmpb/dtmpb.pb.go @@ -100,9 +100,8 @@ type DtmRequest struct { Gid string `protobuf:"bytes,1,opt,name=Gid,proto3" json:"Gid,omitempty"` TransType string `protobuf:"bytes,2,opt,name=TransType,proto3" json:"TransType,omitempty"` QueryPrepared string `protobuf:"bytes,3,opt,name=QueryPrepared,proto3" json:"QueryPrepared,omitempty"` - Method string `protobuf:"bytes,4,opt,name=Method,proto3" json:"Method,omitempty"` - Extra map[string]string `protobuf:"bytes,5,rep,name=Extra,proto3" json:"Extra,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - AppData []byte `protobuf:"bytes,6,opt,name=AppData,proto3" json:"AppData,omitempty"` + Extra map[string]string `protobuf:"bytes,4,rep,name=Extra,proto3" json:"Extra,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Data string `protobuf:"bytes,5,opt,name=Data,proto3" json:"Data,omitempty"` } func (x *DtmRequest) Reset() { @@ -158,13 +157,6 @@ func (x *DtmRequest) GetQueryPrepared() string { return "" } -func (x *DtmRequest) GetMethod() string { - if x != nil { - return x.Method - } - return "" -} - func (x *DtmRequest) GetExtra() map[string]string { if x != nil { return x.Extra @@ -172,11 +164,11 @@ func (x *DtmRequest) GetExtra() map[string]string { return nil } -func (x *DtmRequest) GetAppData() []byte { +func (x *DtmRequest) GetData() string { if x != nil { - return x.AppData + return x.Data } - return nil + return "" } // The response message containing the greetings @@ -366,51 +358,52 @@ var file_dtmpb_dtmpb_proto_rawDesc = []byte{ 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x49, 0x44, 0x12, 0x10, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x44, 0x74, 0x6d, 0x22, 0x83, 0x02, 0x0a, 0x0a, 0x44, 0x74, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x44, 0x74, 0x6d, 0x22, 0xe5, 0x01, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x47, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x47, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x16, - 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x33, 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, - 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, - 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, + 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x33, + 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, + 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x45, 0x78, + 0x74, 0x72, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, 0x78, 0x74, 0x72, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0x48, 0x0a, 0x08, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, + 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, + 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x0b, + 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x28, 0x0a, 0x04, 0x69, + 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x63, + 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, + 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x12, 0x34, 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x41, - 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x41, 0x70, + 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x41, 0x70, 0x70, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, - 0x48, 0x0a, 0x08, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, - 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, - 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, - 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x0b, 0x42, 0x75, - 0x73, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x28, 0x0a, 0x04, 0x69, 0x6e, 0x66, - 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, - 0x2e, 0x44, 0x74, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x69, - 0x6e, 0x66, 0x6f, 0x12, 0x34, 0x0a, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x18, 0x02, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x52, 0x05, 0x45, 0x78, 0x74, 0x72, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x41, 0x70, 0x70, - 0x44, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x41, 0x70, 0x70, 0x44, - 0x61, 0x74, 0x61, 0x1a, 0x38, 0x0a, 0x0a, 0x45, 0x78, 0x74, 0x72, 0x61, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x49, 0x0a, - 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x74, - 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, - 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, 0x6d, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x44, 0x74, - 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x35, 0x0a, 0x03, 0x44, 0x74, 0x6d, 0x12, - 0x2e, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x12, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, - 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x64, 0x74, - 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, - 0x1b, 0x5a, 0x19, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x65, - 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x49, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1c, 0x0a, 0x09, + 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x44, 0x74, + 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, + 0x44, 0x74, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x67, 0x0a, 0x03, 0x44, 0x74, + 0x6d, 0x12, 0x2e, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x12, 0x2e, 0x64, 0x74, 0x6d, 0x63, + 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, + 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, + 0x00, 0x12, 0x30, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x64, 0x74, + 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x10, 0x2e, 0x64, 0x74, 0x6d, 0x63, 0x6c, 0x69, 0x2e, 0x44, 0x74, 0x6d, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x19, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x79, 0x65, 0x64, 0x66, 0x2f, 0x64, 0x74, 0x6d, 0x2f, 0x64, 0x74, 0x6d, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -440,9 +433,11 @@ var file_dtmpb_dtmpb_proto_depIdxs = []int32{ 0, // 1: dtmcli.BusiRequest.info:type_name -> dtmcli.DtmTransInfo 6, // 2: dtmcli.BusiRequest.Extra:type_name -> dtmcli.BusiRequest.ExtraEntry 1, // 3: dtmcli.Dtm.Call:input_type -> dtmcli.DtmRequest - 2, // 4: dtmcli.Dtm.Call:output_type -> dtmcli.DtmReply - 4, // [4:5] is the sub-list for method output_type - 3, // [3:4] is the sub-list for method input_type + 1, // 4: dtmcli.Dtm.Submit:input_type -> dtmcli.DtmRequest + 2, // 5: dtmcli.Dtm.Call:output_type -> dtmcli.DtmReply + 2, // 6: dtmcli.Dtm.Submit:output_type -> dtmcli.DtmReply + 5, // [5:7] is the sub-list for method output_type + 3, // [3:5] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name diff --git a/dtmpb/dtmpb.proto b/dtmpb/dtmpb.proto index 4374308..3ba0a3a 100644 --- a/dtmpb/dtmpb.proto +++ b/dtmpb/dtmpb.proto @@ -7,6 +7,8 @@ package dtmcli; // The dtm service definition. service Dtm { rpc Call(DtmRequest) returns (DtmReply) {} + rpc Submit(DtmRequest) returns (DtmReply) {} + } message DtmTransInfo { @@ -21,9 +23,8 @@ message DtmRequest { string Gid = 1; string TransType = 2; string QueryPrepared = 3; - string Method = 4; - map Extra = 5; - bytes AppData = 6; + map Extra = 4; + string Data = 5; } // The response message containing the greetings diff --git a/dtmpb/dtmpb_grpc.pb.go b/dtmpb/dtmpb_grpc.pb.go index 54158b0..fbd7b96 100644 --- a/dtmpb/dtmpb_grpc.pb.go +++ b/dtmpb/dtmpb_grpc.pb.go @@ -19,6 +19,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DtmClient interface { Call(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmReply, error) + Submit(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmReply, error) } type dtmClient struct { @@ -38,11 +39,21 @@ func (c *dtmClient) Call(ctx context.Context, in *DtmRequest, opts ...grpc.CallO return out, nil } +func (c *dtmClient) Submit(ctx context.Context, in *DtmRequest, opts ...grpc.CallOption) (*DtmReply, error) { + out := new(DtmReply) + err := c.cc.Invoke(ctx, "/dtmcli.Dtm/Submit", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DtmServer is the server API for Dtm service. // All implementations must embed UnimplementedDtmServer // for forward compatibility type DtmServer interface { Call(context.Context, *DtmRequest) (*DtmReply, error) + Submit(context.Context, *DtmRequest) (*DtmReply, error) mustEmbedUnimplementedDtmServer() } @@ -53,6 +64,9 @@ type UnimplementedDtmServer struct { func (UnimplementedDtmServer) Call(context.Context, *DtmRequest) (*DtmReply, error) { return nil, status.Errorf(codes.Unimplemented, "method Call not implemented") } +func (UnimplementedDtmServer) Submit(context.Context, *DtmRequest) (*DtmReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Submit not implemented") +} func (UnimplementedDtmServer) mustEmbedUnimplementedDtmServer() {} // UnsafeDtmServer may be embedded to opt out of forward compatibility for this service. @@ -84,6 +98,24 @@ func _Dtm_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{ return interceptor(ctx, in, info, handler) } +func _Dtm_Submit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DtmRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DtmServer).Submit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dtmcli.Dtm/Submit", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DtmServer).Submit(ctx, req.(*DtmRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Dtm_ServiceDesc is the grpc.ServiceDesc for Dtm service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -95,6 +127,10 @@ var Dtm_ServiceDesc = grpc.ServiceDesc{ MethodName: "Call", Handler: _Dtm_Call_Handler, }, + { + MethodName: "Submit", + Handler: _Dtm_Submit_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "dtmpb/dtmpb.proto", diff --git a/dtmpb/msg.go b/dtmpb/message.go similarity index 67% rename from dtmpb/msg.go rename to dtmpb/message.go index 8a6dea2..8f6a0cf 100644 --- a/dtmpb/msg.go +++ b/dtmpb/message.go @@ -1,6 +1,10 @@ package dtmpb -import "github.com/yedf/dtm/dtmcli" +import ( + "context" + + "github.com/yedf/dtm/dtmcli" +) // MsgGrpc reliable msg type type MsgGrpc struct { @@ -35,11 +39,11 @@ func NewMsgGrpc(server string, gid string) *MsgGrpc { } // Add add a new step -func (s *MsgGrpc) Add(action string, postData interface{}) *MsgGrpc { - dtmcli.Logf("msg %s Add %s %v", s.MsgDataGrpc.Gid, action, postData) +func (s *MsgGrpc) Add(action string, data []byte) *MsgGrpc { + dtmcli.Logf("msg %s Add %s %v", s.MsgDataGrpc.Gid, action, string(data)) step := MsgStepGrpc{ Action: action, - Data: dtmcli.MustMarshalString(postData), + Data: string(data), } s.Steps = append(s.Steps, step) return s @@ -47,5 +51,10 @@ func (s *MsgGrpc) Add(action string, postData interface{}) *MsgGrpc { // Submit submit the msg func (s *MsgGrpc) Submit() error { - return s.CallDtm(&s.MsgDataGrpc, "submit") + _, err := MustGetDtmClient(s.Dtm).Submit(context.Background(), &DtmRequest{ + Gid: s.Gid, + TransType: s.TransType, + Data: dtmcli.MustMarshalString(&s.Steps), + }) + return err } diff --git a/dtmpb/type.go b/dtmpb/type.go new file mode 100644 index 0000000..d0a590c --- /dev/null +++ b/dtmpb/type.go @@ -0,0 +1,28 @@ +package dtmpb + +import ( + "github.com/yedf/dtm/dtmcli" + grpc "google.golang.org/grpc" +) + +var clients = map[string]DtmClient{} + +// GetDtmClient 1 +func GetDtmClient(grpcServer string) (cli DtmClient, rerr error) { + if clients[grpcServer] == nil { + conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(dtmcli.GrpcClientLog)) + if err == nil { + clients[grpcServer] = NewDtmClient(conn) + dtmcli.Logf("dtm client inited for %s", grpcServer) + } + } + cli = clients[grpcServer] + return +} + +// MustGetDtmClient 1 +func MustGetDtmClient(grpcServer string) DtmClient { + cli, err := GetDtmClient(grpcServer) + dtmcli.E2P(err) + return cli +} diff --git a/dtmsvr/api.go b/dtmsvr/api.go index 11d06ca..38968bc 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -1,123 +1,15 @@ package dtmsvr -import ( - "errors" - "fmt" +import "fmt" - "github.com/gin-gonic/gin" - "github.com/yedf/dtm/common" - "github.com/yedf/dtm/dtmcli" - "gorm.io/gorm" - "gorm.io/gorm/clause" -) - -func addRoute(engine *gin.Engine) { - engine.POST("/api/dtmsvr/prepare", common.WrapHandler(prepare)) - engine.POST("/api/dtmsvr/submit", common.WrapHandler(submit)) - engine.POST("/api/dtmsvr/registerXaBranch", common.WrapHandler(registerXaBranch)) - engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(registerTccBranch)) - engine.POST("/api/dtmsvr/abort", common.WrapHandler(abort)) - engine.GET("/api/dtmsvr/query", common.WrapHandler(query)) - engine.GET("/api/dtmsvr/newGid", common.WrapHandler(newGid)) -} - -func newGid(c *gin.Context) (interface{}, error) { - return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil -} - -func prepare(c *gin.Context) (interface{}, error) { - t := TransFromContext(c) - t.Status = "prepared" - t.saveNew(dbGet()) - return dtmcli.ResultSuccess, nil -} - -func submit(c *gin.Context) (interface{}, error) { +func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) { db := dbGet() - t := TransFromContext(c) dbt := TransFromDb(db, t.Gid) if dbt != nil && dbt.Status != "prepared" && dbt.Status != "submitted" { return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status %s, cannot sumbmit", dbt.Status)}, nil } t.Status = "submitted" t.saveNew(db) - return t.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil -} - -func abort(c *gin.Context) (interface{}, error) { - db := dbGet() - t := TransFromContext(c) - dbt := TransFromDb(db, t.Gid) - if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { - return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil - } - return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil -} - -func registerXaBranch(c *gin.Context) (interface{}, error) { - branch := TransBranch{} - err := c.BindJSON(&branch) - e2p(err) - db := dbGet() - dbt := TransFromDb(db, branch.Gid) - if dbt.Status != "prepared" { - return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil - } - branches := []TransBranch{branch, branch} - branches[0].BranchType = "rollback" - branches[1].BranchType = "commit" - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(branches) - e2p(err) - global := TransGlobal{Gid: branch.Gid} - global.touch(db, config.TransCronInterval) - return dtmcli.ResultSuccess, nil -} - -func registerTccBranch(c *gin.Context) (interface{}, error) { - data := dtmcli.MS{} - err := c.BindJSON(&data) - e2p(err) - branch := TransBranch{ - Gid: data["gid"], - BranchID: data["branch_id"], - Status: data["status"], - Data: data["data"], - } - db := dbGet() - dbt := TransFromDb(db, branch.Gid) - if dbt.Status != "prepared" { - return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil - } - - branches := []TransBranch{branch, branch, branch} - for i, b := range []string{"cancel", "confirm", "try"} { - branches[i].BranchType = b - branches[i].URL = data[b] - } - - db.Must().Clauses(clause.OnConflict{ - DoNothing: true, - }).Create(branches) - e2p(err) - global := TransGlobal{Gid: branch.Gid} - global.touch(dbGet(), config.TransCronInterval) - return dtmcli.ResultSuccess, nil -} - -func query(c *gin.Context) (interface{}, error) { - gid := c.Query("gid") - if gid == "" { - return nil, errors.New("no gid specified") - } - trans := TransGlobal{} - db := dbGet() - dbr := db.Must().Where("gid", gid).First(&trans) - if dbr.Error == gorm.ErrRecordNotFound { - return M{"transaction": nil, "branches": [0]int{}}, nil - } - branches := []TransBranch{} - db.Must().Where("gid", gid).Find(&branches) - return M{"transaction": trans, "branches": branches}, nil + return t.Process(db, waitResult), nil + } diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go index c3c64a0..1c91be1 100644 --- a/dtmsvr/api_grpc.go +++ b/dtmsvr/api_grpc.go @@ -15,10 +15,15 @@ type dtmServer struct { pb.UnimplementedDtmServer } -// SayHello implements helloworld.GreeterServer func (s *dtmServer) Call(ctx context.Context, in *pb.DtmRequest) (*pb.DtmReply, error) { log.Printf("dtmServer Received: %v", in) - dynamicCallPb(ctx, in, in.Extra["BusiFunc"], in.AppData) + dynamicCallPb(ctx, in, in.Extra["BusiFunc"], []byte(in.Data)) + return &pb.DtmReply{DtmResult: "SUCCESS", DtmMessage: "ok"}, nil +} + +func (s *dtmServer) Submit(ctx context.Context, in *pb.DtmRequest) (*pb.DtmReply, error) { + log.Printf("dtmServer Received: %v", in) + dynamicCallPb(ctx, in, in.Extra["BusiFunc"], []byte(in.Data)) return &pb.DtmReply{DtmResult: "SUCCESS", DtmMessage: "ok"}, nil } diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go new file mode 100644 index 0000000..29a9220 --- /dev/null +++ b/dtmsvr/api_http.go @@ -0,0 +1,115 @@ +package dtmsvr + +import ( + "errors" + "fmt" + + "github.com/gin-gonic/gin" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmcli" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +func addRoute(engine *gin.Engine) { + engine.POST("/api/dtmsvr/prepare", common.WrapHandler(prepare)) + engine.POST("/api/dtmsvr/submit", common.WrapHandler(submit)) + engine.POST("/api/dtmsvr/registerXaBranch", common.WrapHandler(registerXaBranch)) + engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(registerTccBranch)) + engine.POST("/api/dtmsvr/abort", common.WrapHandler(abort)) + engine.GET("/api/dtmsvr/query", common.WrapHandler(query)) + engine.GET("/api/dtmsvr/newGid", common.WrapHandler(newGid)) +} + +func newGid(c *gin.Context) (interface{}, error) { + return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil +} + +func prepare(c *gin.Context) (interface{}, error) { + t := TransFromContext(c) + t.Status = "prepared" + t.saveNew(dbGet()) + return dtmcli.ResultSuccess, nil +} + +func submit(c *gin.Context) (interface{}, error) { + return svcSubmit(TransFromContext(c), c.Query("wait_result") == "1") +} + +func abort(c *gin.Context) (interface{}, error) { + db := dbGet() + t := TransFromContext(c) + dbt := TransFromDb(db, t.Gid) + if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" { + return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil + } + return dbt.Process(db, c.Query("wait_result") == "true" || c.Query("wait_result") == "1"), nil +} + +func registerXaBranch(c *gin.Context) (interface{}, error) { + branch := TransBranch{} + err := c.BindJSON(&branch) + e2p(err) + db := dbGet() + dbt := TransFromDb(db, branch.Gid) + if dbt.Status != "prepared" { + return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil + } + branches := []TransBranch{branch, branch} + branches[0].BranchType = "rollback" + branches[1].BranchType = "commit" + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(branches) + e2p(err) + global := TransGlobal{Gid: branch.Gid} + global.touch(db, config.TransCronInterval) + return dtmcli.ResultSuccess, nil +} + +func registerTccBranch(c *gin.Context) (interface{}, error) { + data := dtmcli.MS{} + err := c.BindJSON(&data) + e2p(err) + branch := TransBranch{ + Gid: data["gid"], + BranchID: data["branch_id"], + Status: data["status"], + Data: data["data"], + } + db := dbGet() + dbt := TransFromDb(db, branch.Gid) + if dbt.Status != "prepared" { + return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil + } + + branches := []TransBranch{branch, branch, branch} + for i, b := range []string{"cancel", "confirm", "try"} { + branches[i].BranchType = b + branches[i].URL = data[b] + } + + db.Must().Clauses(clause.OnConflict{ + DoNothing: true, + }).Create(branches) + e2p(err) + global := TransGlobal{Gid: branch.Gid} + global.touch(dbGet(), config.TransCronInterval) + return dtmcli.ResultSuccess, nil +} + +func query(c *gin.Context) (interface{}, error) { + gid := c.Query("gid") + if gid == "" { + return nil, errors.New("no gid specified") + } + trans := TransGlobal{} + db := dbGet() + dbr := db.Must().Where("gid", gid).First(&trans) + if dbr.Error == gorm.ErrRecordNotFound { + return M{"transaction": nil, "branches": [0]int{}}, nil + } + branches := []TransBranch{} + db.Must().Where("gid", gid).Find(&branches) + return M{"transaction": trans, "branches": branches}, nil +} diff --git a/dtmsvr/dtmsvr.mysql.sql b/dtmsvr/dtmsvr.mysql.sql index af42fa0..1fe1015 100644 --- a/dtmsvr/dtmsvr.mysql.sql +++ b/dtmsvr/dtmsvr.mysql.sql @@ -4,10 +4,11 @@ drop table IF EXISTS dtm.trans_global; CREATE TABLE if not EXISTS dtm.trans_global ( `id` int(11) NOT NULL AUTO_INCREMENT, `gid` varchar(128) NOT NULL COMMENT '事务全局id', - `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa', + `trans_type` varchar(45) not null COMMENT '事务类型: saga | xa | tcc | msg', `data` TEXT COMMENT '事务携带的数据', `status` varchar(45) NOT NULL COMMENT '全局事务的状态 prepared | submitted | finished | rollbacked', `query_prepared` varchar(128) NOT NULL COMMENT 'prepared状态事务的查询api', + `protocol` varchar(45) not null comment '通信协议 http | grpc', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `commit_time` datetime DEFAULT NULL, diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go index 0bad662..c7fb239 100644 --- a/dtmsvr/trans.go +++ b/dtmsvr/trans.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmpb" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -19,6 +20,7 @@ type TransGlobal struct { Data string `json:"data"` Status string `json:"status"` QueryPrepared string `json:"query_prepared"` + Protocol string `json:"protocol"` CommitTime *time.Time FinishTime *time.Time RollbackTime *time.Time @@ -197,6 +199,20 @@ func TransFromContext(c *gin.Context) *TransGlobal { } m := TransGlobal{} dtmcli.MustRemarshal(data, &m) + m.Protocol = "http" + return &m +} + +// TransFromDtmRequest TransFromContext +func TransFromDtmRequest(c *dtmpb.DtmRequest) *TransGlobal { + m := TransGlobal{ + Gid: c.Gid, + TransType: c.TransType, + QueryPrepared: c.QueryPrepared, + Data: c.Data, + Protocol: "grpc", + } + m.Protocol = "http" return &m } diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go index ae0eea3..342bef2 100644 --- a/dtmsvr/trans_xa.go +++ b/dtmsvr/trans_xa.go @@ -20,11 +20,7 @@ func (t *transXaProcessor) GenBranches() []TransBranch { return []TransBranch{} } func (t *transXaProcessor) ExecBranch(db *common.DB, branch *TransBranch) { - resp, err := dtmcli.RestyClient.R().SetQueryParams(dtmcli.MS{ - "branch_id": branch.BranchID, - "action": dtmcli.If(t.Status == "prepared" || t.Status == "aborting", "rollback", "commit").(string), - "gid": branch.Gid, - }).Post(branch.URL) + resp, err := dtmcli.RestyClient.R().SetQueryParams(t.getBranchParams(branch)).Post(branch.URL) e2p(err) body := resp.String() if strings.Contains(body, "SUCCESS") { diff --git a/examples/main_grpc.go b/examples/main_grpc.go index 8200613..526f931 100644 --- a/examples/main_grpc.go +++ b/examples/main_grpc.go @@ -2,8 +2,8 @@ package examples import ( "context" - "log" + "github.com/yedf/dtm/dtmcli" dtmpb "github.com/yedf/dtm/dtmpb" ) @@ -12,8 +12,21 @@ type busiServer struct { UnimplementedBusiServer } -// SayHello implements helloworld.GreeterServer func (s *busiServer) Call(ctx context.Context, in *dtmpb.BusiRequest) (*dtmpb.BusiReply, error) { - log.Printf("busiServer received: %v", in) + dtmcli.Logf("busiServer %s received: %v", dtmcli.GetFuncName(), in) + return &dtmpb.BusiReply{DtmResult: "SUCCESS", DtmMessage: "ok"}, nil +} + +func (s *busiServer) TransIn(ctx context.Context, in *dtmpb.BusiRequest) (*dtmpb.BusiReply, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + dtmcli.Logf("busiServer %s received: %v %v", dtmcli.GetFuncName(), in.Info, req) + return &dtmpb.BusiReply{DtmResult: "SUCCESS", DtmMessage: "ok"}, nil +} + +func (s *busiServer) TransOut(ctx context.Context, in *dtmpb.BusiRequest) (*dtmpb.BusiReply, error) { + req := TransReq{} + dtmcli.MustUnmarshal(in.AppData, &req) + dtmcli.Logf("busiServer %s received: %v %v", dtmcli.GetFuncName(), in.Info, req) return &dtmpb.BusiReply{DtmResult: "SUCCESS", DtmMessage: "ok"}, nil } diff --git a/examples/main_msg_pb.go b/examples/main_msg_pb.go index f47743c..5318189 100644 --- a/examples/main_msg_pb.go +++ b/examples/main_msg_pb.go @@ -1,8 +1,6 @@ package examples import ( - "context" - "github.com/gin-gonic/gin" "github.com/yedf/dtm/dtmcli" dtmpb "github.com/yedf/dtm/dtmpb" @@ -16,14 +14,20 @@ func MsgPbSetup(app *gin.Engine) { // MsgPbFireRequest 1 func MsgPbFireRequest() string { dtmcli.Logf("MsgPbFireRequest") - reply, err := DtmClient.Call(context.Background(), &dtmpb.DtmRequest{ - Gid: "pb_test", - TransType: "msg", - Method: "submit", - Extra: dtmcli.MS{ - "BusiFunc": BusiPb + "/examples.Busi/Call", - }, - }) - dtmcli.Logf("reply and err is: %v, %v", reply, err) + // reply, err := DtmClient.Call(context.Background(), &dtmpb.DtmRequest{ + // Gid: "pb_test", + // TransType: "msg", + // Method: "submit", + // Extra: dtmcli.MS{ + // "BusiFunc": BusiPb + "/examples.Busi/Call", + // }, + // }) + // dtmcli.Logf("reply and err is: %v, %v", reply, err) + req := dtmcli.MustMarshal(&TransReq{Amount: 30}) + msg := dtmpb.NewMsgGrpc(DtmGrpcServer, dtmcli.MustGenGid(DtmServer)). + Add(BusiPb+"/examples.Busi/TransOut", req). + Add(BusiPb+"/examples.Busi/TransIn", req) + err := msg.Submit() + e2p(err) return "" } diff --git a/examples/main_xa.go b/examples/main_xa.go index ebd2994..0d40f8f 100644 --- a/examples/main_xa.go +++ b/examples/main_xa.go @@ -19,7 +19,7 @@ func XaSetup(app *gin.Engine) { var err error XaClient, err = dtmcli.NewXaClient(DtmServer, config.DB, Busi+"/xa", func(path string, xa *dtmcli.XaClient) { app.POST(path, common.WrapHandler(func(c *gin.Context) (interface{}, error) { - return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("action")) + return xa.HandleCallback(c.Query("gid"), c.Query("branch_id"), c.Query("branch_type")) })) }) e2p(err)