diff --git a/common/utils.go b/common/utils.go new file mode 100644 index 0000000..178114c --- /dev/null +++ b/common/utils.go @@ -0,0 +1,32 @@ +package common + +import ( + "github.com/bwmarrin/snowflake" +) + +var gNode *snowflake.Node = nil + +func GenGid() string { + return gNode.Generate().Base58() +} + +func init() { + node, err := snowflake.NewNode(1) + if err != nil { + panic(err) + } + gNode = node +} + +func PanicIfError(err error) { + if err != nil { + panic(err) + } +} + +func If(condition bool, trueObj interface{}, falseObj interface{}) interface{} { + if condition { + return trueObj + } + return falseObj +} diff --git a/dtm/saga.go b/dtm/saga.go new file mode 100644 index 0000000..5b9707a --- /dev/null +++ b/dtm/saga.go @@ -0,0 +1,59 @@ +package dtm + +import ( + "fmt" + + "github.com/gin-gonic/gin" + "github.com/go-resty/resty/v2" +) + +var client *resty.Client = resty.New() + +type Saga struct { + Server string + Gid string + Steps []SagaStep + TransQuery string +} +type SagaStep struct { + Action string + Compensate string + PostData interface{} +} + +func (s *Saga) Add(action string, compensate string, postData interface{}) error { + step := SagaStep{ + Action: action, + Compensate: compensate, + } + step.PostData = postData + s.Steps = append(s.Steps, step) + return nil +} + +func (s *Saga) Prepare(url string) error { + s.TransQuery = url + resp, err := client.R().SetBody(gin.H{ + "Gid": s.Gid, + "TransQuery": s.TransQuery, + "Steps": s.Steps, + }).Post(fmt.Sprintf("%s/prepare", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("prepare failed: %v", resp.Body()) + } + return nil +} + +func (s *Saga) Commit() error { + resp, err := client.R().SetBody(gin.H{}).Post(fmt.Sprintf("%s/commit", s.Server)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("commit failed: %v", resp.Body()) + } + return nil +} diff --git a/dtmsvr/config.go b/dtmsvr/config.go new file mode 100644 index 0000000..78ff086 --- /dev/null +++ b/dtmsvr/config.go @@ -0,0 +1,28 @@ +package dtmsvr + +import ( + "path/filepath" + "runtime" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +type Config struct { + Server string `json:"server"` + Rabbitmq RabbitmqConfig `json:"rabbitmq"` +} + +var ServerConfig Config = Config{} + +func LoadConfig() { + _, file, _, _ := runtime.Caller(0) + viper.SetConfigFile(filepath.Dir(file) + "/dtmsvr.yml") + if err := viper.ReadInConfig(); err != nil { + panic(err) + } + if err := viper.Unmarshal(&ServerConfig); err != nil { + panic(err) + } + logrus.Printf("config is: %v", ServerConfig) +} diff --git a/dtmsvr/consumerHalfMsg.go b/dtmsvr/consumerHalfMsg.go new file mode 100644 index 0000000..2effae0 --- /dev/null +++ b/dtmsvr/consumerHalfMsg.go @@ -0,0 +1,5 @@ +package dtmsvr + +func ConsumeHalfMsg() { + +} diff --git a/dtmsvr/consumerMsg.go b/dtmsvr/consumerMsg.go new file mode 100644 index 0000000..63edc67 --- /dev/null +++ b/dtmsvr/consumerMsg.go @@ -0,0 +1,5 @@ +package dtmsvr + +func ConsumeMsg() { + +} diff --git a/dtmsvr/rabbitmq.go b/dtmsvr/rabbitmq.go new file mode 100644 index 0000000..71102b8 --- /dev/null +++ b/dtmsvr/rabbitmq.go @@ -0,0 +1,173 @@ +package dtmsvr + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" + "github.com/yedf/dtm/common" +) + +type Rabbitmq struct { + Config RabbitmqConfig + ChannelPool *sync.Pool +} + +type RabbitmqConfig struct { + Host string + Username string + Password string + Vhost string + Exchange string + KeyPrepared string + KeyCommited string + QueuePrepared string + QueueCommited string +} + +type RabbitmqChannel struct { + Confirms chan amqp.Confirmation + Channel *amqp.Channel +} + +type RabbitmqConst string + +const ( + RabbitmqConstPrepared RabbitmqConst = "dtm_prepared" + RabbitmqConstCommited RabbitmqConst = "dtm_commited" +) + +func RabbitmqNew(conf *RabbitmqConfig) *Rabbitmq { + return &Rabbitmq{ + Config: *conf, + ChannelPool: &sync.Pool{ + New: func() interface{} { + channel := newChannel(conf) + err := channel.Confirm(false) + common.PanicIfError(err) + confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 2)) + return &RabbitmqChannel{ + Channel: channel, + Confirms: confirms, + } + }, + }, + } +} + +func newChannel(conf *RabbitmqConfig) *amqp.Channel { + uri := fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Username, conf.Password, conf.Host, conf.Vhost) + logrus.Printf("connecting rabbitmq: %s", uri) + conn, err := amqp.Dial(uri) + common.PanicIfError(err) + channel, err := conn.Channel() + common.PanicIfError(err) + err = channel.ExchangeDeclare( + conf.Exchange, // exchange name + "direct", // exchange type + true, // durable + false, // autoDelete + false, // internal + false, // noWait + nil, // args + ) + common.PanicIfError(err) + return channel +} + +func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{}) error { + body, err := json.Marshal(data) + common.PanicIfError(err) + channel := r.ChannelPool.Get().(*RabbitmqChannel) + err = channel.Channel.Publish( + r.Config.Exchange, + common.If(key == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), + true, + false, + amqp.Publishing{ + ContentType: "application/json", + DeliveryMode: amqp.Persistent, + Body: body, + }, + ) + common.PanicIfError(err) + confirm := <-channel.Confirms + r.ChannelPool.Put(channel) + logrus.Printf("confirmed %t for %s", confirm.Ack, data["gid"]) + if !confirm.Ack { + return fmt.Errorf("confirm not ok for %s", data["gid"]) + } + return nil +} + +type RabbitmqQueue struct { + Channel *amqp.Channel + Conn *amqp.Connection + Deliveries <-chan amqp.Delivery +} + +func (q *RabbitmqQueue) Close() { + q.Channel.Close() + // q.Conn.Close() +} + +func (q *RabbitmqQueue) WaitAndHandle(handler func(data map[string]interface{})) { + for { + q.WaitAndHandleOne(handler) + } +} +func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data map[string]interface{})) { + logrus.Printf("reading message") + msg := <-q.Deliveries + data := map[string]interface{}{} + err := json.Unmarshal(msg.Body, &data) + logrus.Printf("handling one message: %v", data) + common.PanicIfError(err) + handler(data) + err = msg.Ack(false) + common.PanicIfError(err) + logrus.Printf("acked msg: %d", msg.DeliveryTag) +} + +func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue { + channel := newChannel(&r.Config) + queue, err := channel.QueueDeclare( + common.If(queueType == RabbitmqConstPrepared, r.Config.QueuePrepared, r.Config.QueueCommited).(string), // name of the queue + true, // durable + false, // delete when unused + false, // exclusive + false, // noWait + nil, // arguments + ) + common.PanicIfError(err) + logrus.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange", + queue.Name, queue.Messages, queue.Consumers) + err = channel.QueueBind( + queue.Name, // name of the queue + common.If(queueType == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), // bindingKey + r.Config.Exchange, // sourceExchange + false, // noWait + nil, // arguments + ) + common.PanicIfError(err) + deliveries, err := channel.Consume( + queue.Name, // name + "simple-consumer", // consumerTag, + false, // noAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // arguments + ) + common.PanicIfError(err) + return &RabbitmqQueue{ + Channel: channel, + Deliveries: deliveries, + } +} + +func (r *Rabbitmq) HandleMsg(data interface{}) { + +} diff --git a/dtmsvr/rabbitmq_test.go b/dtmsvr/rabbitmq_test.go new file mode 100644 index 0000000..305ef8f --- /dev/null +++ b/dtmsvr/rabbitmq_test.go @@ -0,0 +1,31 @@ +package dtmsvr + +import ( + "testing" + + "github.com/gin-gonic/gin" + "github.com/magiconair/properties/assert" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" +) + +func init() { + LoadConfig() +} + +func TestRabbitConfig(t *testing.T) { + assert.Matches(t, ServerConfig.Rabbitmq.KeyCommited, "key_committed") +} + +func TestRabbitmq1Msg(t *testing.T) { + rb := RabbitmqNew(&ServerConfig.Rabbitmq) + err := rb.SendAndConfirm(RabbitmqConstPrepared, gin.H{ + "gid": common.GenGid(), + }) + assert.Equal(t, nil, err) + queue := rb.QueueNew(RabbitmqConstPrepared) + queue.WaitAndHandle(func(data map[string]interface{}) { + logrus.Printf("processed msg: %v in queue1", data) + }) + assert.Equal(t, 0, 1) +} diff --git a/dtmsvr/service.go b/dtmsvr/service.go new file mode 100644 index 0000000..1f8fe31 --- /dev/null +++ b/dtmsvr/service.go @@ -0,0 +1,22 @@ +package dtmsvr + +import "github.com/gin-gonic/gin" + +func AddRoute(engine *gin.Engine) { + route := engine.Group("/api/dmtsvr") + route.POST("/prepare", Prepare) + route.POST("/commit", Commit) +} + +func Prepare(c *gin.Context) { + data := gin.H{} + err := c.BindJSON(&data) + if err == nil { + return + } + +} + +func Commit(c *gin.Context) { + +} diff --git a/dtmsvr/svr.go b/dtmsvr/svr.go new file mode 100644 index 0000000..37cab2f --- /dev/null +++ b/dtmsvr/svr.go @@ -0,0 +1,16 @@ +package dtmsvr + +import ( + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" +) + +func main() { + logrus.Printf("start tc") + gin.SetMode(gin.ReleaseMode) + app := gin.Default() + AddRoute(app) + go ConsumeHalfMsg() + go ConsumeMsg() + app.Run() +} diff --git a/examples/config.go b/examples/config.go new file mode 100644 index 0000000..546cb56 --- /dev/null +++ b/examples/config.go @@ -0,0 +1,4 @@ +package examples + +const TcServer = "http://localhost:8080/api/dtmsvr" +const Busi = "http://localhost:8081/api/busi" diff --git a/main.go b/main.go new file mode 100644 index 0000000..04e8632 --- /dev/null +++ b/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "github.com/yedf/dtm/common" + "github.com/yedf/dtm/dtmsvr" +) + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + dtmsvr.LoadConfig() + rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq) + err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, gin.H{ + "gid": common.GenGid(), + }) + common.PanicIfError(err) + queue := rb.QueueNew(dtmsvr.RabbitmqConstPrepared) + queue.WaitAndHandle(func(data map[string]interface{}) { + logrus.Printf("processed msg: %v in queue1", data) + }) +}