rabbitmq seems ok

This commit is contained in:
yedongfu 2021-05-17 15:09:12 +08:00
parent 9e48e767b3
commit 32be5a90aa
11 changed files with 397 additions and 0 deletions

32
common/utils.go Normal file
View File

@ -0,0 +1,32 @@
package common
import (
"github.com/bwmarrin/snowflake"
)
var gNode *snowflake.Node = nil
func GenGid() string {
return gNode.Generate().Base58()
}
func init() {
node, err := snowflake.NewNode(1)
if err != nil {
panic(err)
}
gNode = node
}
func PanicIfError(err error) {
if err != nil {
panic(err)
}
}
func If(condition bool, trueObj interface{}, falseObj interface{}) interface{} {
if condition {
return trueObj
}
return falseObj
}

59
dtm/saga.go Normal file
View File

@ -0,0 +1,59 @@
package dtm
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
)
var client *resty.Client = resty.New()
type Saga struct {
Server string
Gid string
Steps []SagaStep
TransQuery string
}
type SagaStep struct {
Action string
Compensate string
PostData interface{}
}
func (s *Saga) Add(action string, compensate string, postData interface{}) error {
step := SagaStep{
Action: action,
Compensate: compensate,
}
step.PostData = postData
s.Steps = append(s.Steps, step)
return nil
}
func (s *Saga) Prepare(url string) error {
s.TransQuery = url
resp, err := client.R().SetBody(gin.H{
"Gid": s.Gid,
"TransQuery": s.TransQuery,
"Steps": s.Steps,
}).Post(fmt.Sprintf("%s/prepare", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("prepare failed: %v", resp.Body())
}
return nil
}
func (s *Saga) Commit() error {
resp, err := client.R().SetBody(gin.H{}).Post(fmt.Sprintf("%s/commit", s.Server))
if err != nil {
return err
}
if resp.StatusCode() != 200 {
return fmt.Errorf("commit failed: %v", resp.Body())
}
return nil
}

28
dtmsvr/config.go Normal file
View File

@ -0,0 +1,28 @@
package dtmsvr
import (
"path/filepath"
"runtime"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
type Config struct {
Server string `json:"server"`
Rabbitmq RabbitmqConfig `json:"rabbitmq"`
}
var ServerConfig Config = Config{}
func LoadConfig() {
_, file, _, _ := runtime.Caller(0)
viper.SetConfigFile(filepath.Dir(file) + "/dtmsvr.yml")
if err := viper.ReadInConfig(); err != nil {
panic(err)
}
if err := viper.Unmarshal(&ServerConfig); err != nil {
panic(err)
}
logrus.Printf("config is: %v", ServerConfig)
}

View File

@ -0,0 +1,5 @@
package dtmsvr
func ConsumeHalfMsg() {
}

5
dtmsvr/consumerMsg.go Normal file
View File

@ -0,0 +1,5 @@
package dtmsvr
func ConsumeMsg() {
}

173
dtmsvr/rabbitmq.go Normal file
View File

@ -0,0 +1,173 @@
package dtmsvr
import (
"encoding/json"
"fmt"
"sync"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/yedf/dtm/common"
)
type Rabbitmq struct {
Config RabbitmqConfig
ChannelPool *sync.Pool
}
type RabbitmqConfig struct {
Host string
Username string
Password string
Vhost string
Exchange string
KeyPrepared string
KeyCommited string
QueuePrepared string
QueueCommited string
}
type RabbitmqChannel struct {
Confirms chan amqp.Confirmation
Channel *amqp.Channel
}
type RabbitmqConst string
const (
RabbitmqConstPrepared RabbitmqConst = "dtm_prepared"
RabbitmqConstCommited RabbitmqConst = "dtm_commited"
)
func RabbitmqNew(conf *RabbitmqConfig) *Rabbitmq {
return &Rabbitmq{
Config: *conf,
ChannelPool: &sync.Pool{
New: func() interface{} {
channel := newChannel(conf)
err := channel.Confirm(false)
common.PanicIfError(err)
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 2))
return &RabbitmqChannel{
Channel: channel,
Confirms: confirms,
}
},
},
}
}
func newChannel(conf *RabbitmqConfig) *amqp.Channel {
uri := fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Username, conf.Password, conf.Host, conf.Vhost)
logrus.Printf("connecting rabbitmq: %s", uri)
conn, err := amqp.Dial(uri)
common.PanicIfError(err)
channel, err := conn.Channel()
common.PanicIfError(err)
err = channel.ExchangeDeclare(
conf.Exchange, // exchange name
"direct", // exchange type
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
)
common.PanicIfError(err)
return channel
}
func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{}) error {
body, err := json.Marshal(data)
common.PanicIfError(err)
channel := r.ChannelPool.Get().(*RabbitmqChannel)
err = channel.Channel.Publish(
r.Config.Exchange,
common.If(key == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string),
true,
false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: body,
},
)
common.PanicIfError(err)
confirm := <-channel.Confirms
r.ChannelPool.Put(channel)
logrus.Printf("confirmed %t for %s", confirm.Ack, data["gid"])
if !confirm.Ack {
return fmt.Errorf("confirm not ok for %s", data["gid"])
}
return nil
}
type RabbitmqQueue struct {
Channel *amqp.Channel
Conn *amqp.Connection
Deliveries <-chan amqp.Delivery
}
func (q *RabbitmqQueue) Close() {
q.Channel.Close()
// q.Conn.Close()
}
func (q *RabbitmqQueue) WaitAndHandle(handler func(data map[string]interface{})) {
for {
q.WaitAndHandleOne(handler)
}
}
func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data map[string]interface{})) {
logrus.Printf("reading message")
msg := <-q.Deliveries
data := map[string]interface{}{}
err := json.Unmarshal(msg.Body, &data)
logrus.Printf("handling one message: %v", data)
common.PanicIfError(err)
handler(data)
err = msg.Ack(false)
common.PanicIfError(err)
logrus.Printf("acked msg: %d", msg.DeliveryTag)
}
func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue {
channel := newChannel(&r.Config)
queue, err := channel.QueueDeclare(
common.If(queueType == RabbitmqConstPrepared, r.Config.QueuePrepared, r.Config.QueueCommited).(string), // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
common.PanicIfError(err)
logrus.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange",
queue.Name, queue.Messages, queue.Consumers)
err = channel.QueueBind(
queue.Name, // name of the queue
common.If(queueType == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), // bindingKey
r.Config.Exchange, // sourceExchange
false, // noWait
nil, // arguments
)
common.PanicIfError(err)
deliveries, err := channel.Consume(
queue.Name, // name
"simple-consumer", // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
common.PanicIfError(err)
return &RabbitmqQueue{
Channel: channel,
Deliveries: deliveries,
}
}
func (r *Rabbitmq) HandleMsg(data interface{}) {
}

31
dtmsvr/rabbitmq_test.go Normal file
View File

@ -0,0 +1,31 @@
package dtmsvr
import (
"testing"
"github.com/gin-gonic/gin"
"github.com/magiconair/properties/assert"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
func init() {
LoadConfig()
}
func TestRabbitConfig(t *testing.T) {
assert.Matches(t, ServerConfig.Rabbitmq.KeyCommited, "key_committed")
}
func TestRabbitmq1Msg(t *testing.T) {
rb := RabbitmqNew(&ServerConfig.Rabbitmq)
err := rb.SendAndConfirm(RabbitmqConstPrepared, gin.H{
"gid": common.GenGid(),
})
assert.Equal(t, nil, err)
queue := rb.QueueNew(RabbitmqConstPrepared)
queue.WaitAndHandle(func(data map[string]interface{}) {
logrus.Printf("processed msg: %v in queue1", data)
})
assert.Equal(t, 0, 1)
}

22
dtmsvr/service.go Normal file
View File

@ -0,0 +1,22 @@
package dtmsvr
import "github.com/gin-gonic/gin"
func AddRoute(engine *gin.Engine) {
route := engine.Group("/api/dmtsvr")
route.POST("/prepare", Prepare)
route.POST("/commit", Commit)
}
func Prepare(c *gin.Context) {
data := gin.H{}
err := c.BindJSON(&data)
if err == nil {
return
}
}
func Commit(c *gin.Context) {
}

16
dtmsvr/svr.go Normal file
View File

@ -0,0 +1,16 @@
package dtmsvr
import (
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
func main() {
logrus.Printf("start tc")
gin.SetMode(gin.ReleaseMode)
app := gin.Default()
AddRoute(app)
go ConsumeHalfMsg()
go ConsumeMsg()
app.Run()
}

4
examples/config.go Normal file
View File

@ -0,0 +1,4 @@
package examples
const TcServer = "http://localhost:8080/api/dtmsvr"
const Busi = "http://localhost:8081/api/busi"

22
main.go Normal file
View File

@ -0,0 +1,22 @@
package main
import (
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmsvr"
)
func main() {
logrus.SetFormatter(&logrus.JSONFormatter{})
dtmsvr.LoadConfig()
rb := dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
err := rb.SendAndConfirm(dtmsvr.RabbitmqConstPrepared, gin.H{
"gid": common.GenGid(),
})
common.PanicIfError(err)
queue := rb.QueueNew(dtmsvr.RabbitmqConstPrepared)
queue.WaitAndHandle(func(data map[string]interface{}) {
logrus.Printf("processed msg: %v in queue1", data)
})
}