remove queue
This commit is contained in:
parent
c2b2b3b26c
commit
104ff472dc
@ -58,10 +58,13 @@ func MustMarshalString(v interface{}) string {
|
|||||||
return string(MustMarshal(v))
|
return string(MustMarshal(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustUnmarshalString(s string, obj interface{}) {
|
func MustUnmarshal(b []byte, obj interface{}) {
|
||||||
err := json.Unmarshal([]byte(s), obj)
|
err := json.Unmarshal(b, obj)
|
||||||
PanicIfError(err)
|
PanicIfError(err)
|
||||||
}
|
}
|
||||||
|
func MustUnmarshalString(s string, obj interface{}) {
|
||||||
|
MustUnmarshal([]byte(s), obj)
|
||||||
|
}
|
||||||
|
|
||||||
func MustRemarshal(from interface{}, to interface{}) {
|
func MustRemarshal(from interface{}, to interface{}) {
|
||||||
b, err := json.Marshal(from)
|
b, err := json.Marshal(from)
|
||||||
|
|||||||
20
dtm_test.go
20
dtm_test.go
@ -27,9 +27,6 @@ var myinit int = func() int {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 测试使用的全局对象
|
// 测试使用的全局对象
|
||||||
var rabbit = dtmsvr.RabbitmqNew(&dtmsvr.ServerConfig.Rabbitmq)
|
|
||||||
var queprepared = rabbit.QueueNew(dtmsvr.RabbitmqConstPrepared)
|
|
||||||
var quecommited = rabbit.QueueNew(dtmsvr.RabbitmqConstCommited)
|
|
||||||
var db = dtmsvr.DbGet()
|
var db = dtmsvr.DbGet()
|
||||||
|
|
||||||
func getSagaModel(gid string) *dtmsvr.SagaModel {
|
func getSagaModel(gid string) *dtmsvr.SagaModel {
|
||||||
@ -53,20 +50,18 @@ func getSagaStepStatus(gid string) []string {
|
|||||||
func noramlSaga(t *testing.T) {
|
func noramlSaga(t *testing.T) {
|
||||||
saga := genSaga("gid-noramlSaga", false, false)
|
saga := genSaga("gid-noramlSaga", false, false)
|
||||||
saga.Prepare()
|
saga.Prepare()
|
||||||
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
|
||||||
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
|
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
|
||||||
saga.Commit()
|
saga.Commit()
|
||||||
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
assert.Equal(t, "commited", getSagaModel(saga.Gid).Status)
|
||||||
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
|
dtmsvr.WaitCommitedSaga(saga.Gid)
|
||||||
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
|
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
|
||||||
}
|
}
|
||||||
|
|
||||||
func rollbackSaga2(t *testing.T) {
|
func rollbackSaga2(t *testing.T) {
|
||||||
saga := genSaga("gid-rollbackSaga2", false, true)
|
saga := genSaga("gid-rollbackSaga2", false, true)
|
||||||
saga.Commit()
|
saga.Commit()
|
||||||
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
dtmsvr.WaitCommitedSaga(saga.Gid)
|
||||||
saga.Prepare()
|
saga.Prepare()
|
||||||
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
|
||||||
assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status)
|
assert.Equal(t, "rollbacked", getSagaModel(saga.Gid).Status)
|
||||||
assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid))
|
assert.Equal(t, []string{"rollbacked", "finished", "rollbacked", "rollbacked"}, getSagaStepStatus(saga.Gid))
|
||||||
}
|
}
|
||||||
@ -74,7 +69,6 @@ func rollbackSaga2(t *testing.T) {
|
|||||||
func prepareCancel(t *testing.T) {
|
func prepareCancel(t *testing.T) {
|
||||||
saga := genSaga("gid1-prepareCancel", false, true)
|
saga := genSaga("gid1-prepareCancel", false, true)
|
||||||
saga.Prepare()
|
saga.Prepare()
|
||||||
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
|
||||||
examples.TransQueryResult = "FAIL"
|
examples.TransQueryResult = "FAIL"
|
||||||
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
||||||
examples.TransQueryResult = ""
|
examples.TransQueryResult = ""
|
||||||
@ -84,31 +78,31 @@ func prepareCancel(t *testing.T) {
|
|||||||
func preparePending(t *testing.T) {
|
func preparePending(t *testing.T) {
|
||||||
saga := genSaga("gid1-preparePending", false, false)
|
saga := genSaga("gid1-preparePending", false, false)
|
||||||
saga.Prepare()
|
saga.Prepare()
|
||||||
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
|
||||||
examples.TransQueryResult = "PENDING"
|
examples.TransQueryResult = "PENDING"
|
||||||
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
||||||
examples.TransQueryResult = ""
|
examples.TransQueryResult = ""
|
||||||
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
|
assert.Equal(t, "prepared", getSagaModel(saga.Gid).Status)
|
||||||
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
dtmsvr.CronPreparedOnce(-10 * time.Second)
|
||||||
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
dtmsvr.WaitCommitedSaga(saga.Gid)
|
||||||
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
|
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func commitedPending(t *testing.T) {
|
func commitedPending(t *testing.T) {
|
||||||
saga := genSaga("gid-commitedPending", false, false)
|
saga := genSaga("gid-commitedPending", false, false)
|
||||||
saga.Prepare()
|
saga.Prepare()
|
||||||
queprepared.WaitAndHandleOne(dtmsvr.HandlePreparedMsg)
|
|
||||||
saga.Commit()
|
saga.Commit()
|
||||||
examples.TransOutResult = "PENDING"
|
examples.TransOutResult = "PENDING"
|
||||||
quecommited.WaitAndHandleOne(dtmsvr.HandleCommitedMsg)
|
dtmsvr.WaitCommitedSaga(saga.Gid)
|
||||||
assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid))
|
assert.Equal(t, []string{"pending", "finished", "pending", "pending"}, getSagaStepStatus(saga.Gid))
|
||||||
examples.TransOutResult = ""
|
examples.TransOutResult = ""
|
||||||
dtmsvr.CronCommitedOnce(-10 * time.Second)
|
dtmsvr.CronCommitedOnce(-10 * time.Second)
|
||||||
|
dtmsvr.WaitCommitedSaga(saga.Gid)
|
||||||
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
|
assert.Equal(t, []string{"pending", "finished", "pending", "finished"}, getSagaStepStatus(saga.Gid))
|
||||||
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
|
assert.Equal(t, "finished", getSagaModel(saga.Gid).Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDtmSvr(t *testing.T) {
|
func TestDtmSvr(t *testing.T) {
|
||||||
|
dtmsvr.SagaProcessedTestChan = make(chan string, 1)
|
||||||
// 清理数据
|
// 清理数据
|
||||||
common.PanicIfError(db.Exec("truncate test1.a_saga").Error)
|
common.PanicIfError(db.Exec("truncate test1.a_saga").Error)
|
||||||
common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error)
|
common.PanicIfError(db.Exec("truncate test1.a_saga_step").Error)
|
||||||
|
|||||||
@ -14,8 +14,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Server string `json:"server"`
|
Server string `json:"server"`
|
||||||
Rabbitmq RabbitmqConfig `json:"rabbitmq"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var ServerConfig Config = Config{}
|
var ServerConfig Config = Config{}
|
||||||
|
|||||||
@ -1,15 +1,7 @@
|
|||||||
package dtmsvr
|
package dtmsvr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/yedf/dtm/common"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"gorm.io/gorm/clause"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ModelBase struct {
|
type ModelBase struct {
|
||||||
@ -46,193 +38,3 @@ type SagaStepModel struct {
|
|||||||
func (*SagaStepModel) TableName() string {
|
func (*SagaStepModel) TableName() string {
|
||||||
return "test1.a_saga_step"
|
return "test1.a_saga_step"
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandlePreparedMsg(data M) {
|
|
||||||
db := DbGet()
|
|
||||||
logrus.Printf("creating saga model in prepare")
|
|
||||||
data["steps"] = common.MustMarshalString(data["steps"])
|
|
||||||
m := SagaModel{}
|
|
||||||
common.MustRemarshal(data, &m)
|
|
||||||
m.Status = "prepared"
|
|
||||||
writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps)
|
|
||||||
db1 := db.Clauses(clause.OnConflict{
|
|
||||||
DoNothing: true,
|
|
||||||
}).Create(&m)
|
|
||||||
common.PanicIfError(db1.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func HandleCommitedMsg(data M) {
|
|
||||||
logrus.Printf("creating saga model in commited")
|
|
||||||
data["steps"] = common.MustMarshalString(data["steps"])
|
|
||||||
m := SagaModel{}
|
|
||||||
common.MustRemarshal(data, &m)
|
|
||||||
saveCommitedSagaModel(&m)
|
|
||||||
err := ProcessCommitedSaga(m.Gid)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Printf("---------------handle commited msmg error: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func saveCommitedSagaModel(m *SagaModel) {
|
|
||||||
db := DbGet()
|
|
||||||
m.Status = "commited"
|
|
||||||
stepInserted := false
|
|
||||||
err := db.Transaction(func(db *gorm.DB) error {
|
|
||||||
writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps)
|
|
||||||
dbr := db.Clauses(clause.OnConflict{
|
|
||||||
DoNothing: true,
|
|
||||||
}).Create(&m)
|
|
||||||
if dbr.Error == nil && dbr.RowsAffected == 0 {
|
|
||||||
writeTransLog(m.Gid, "change status", m.Status, -1, "")
|
|
||||||
dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited")
|
|
||||||
}
|
|
||||||
common.PanicIfError(dbr.Error)
|
|
||||||
nsteps := []SagaStepModel{}
|
|
||||||
steps := []M{}
|
|
||||||
err := json.Unmarshal([]byte(m.Steps), &steps)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
for _, step := range steps {
|
|
||||||
nsteps = append(nsteps, SagaStepModel{
|
|
||||||
Gid: m.Gid,
|
|
||||||
Step: len(nsteps) + 1,
|
|
||||||
Data: step["post_data"].(string),
|
|
||||||
Url: step["compensate"].(string),
|
|
||||||
Type: "compensate",
|
|
||||||
Status: "pending",
|
|
||||||
})
|
|
||||||
nsteps = append(nsteps, SagaStepModel{
|
|
||||||
Gid: m.Gid,
|
|
||||||
Step: len(nsteps) + 1,
|
|
||||||
Data: step["post_data"].(string),
|
|
||||||
Url: step["action"].(string),
|
|
||||||
Type: "action",
|
|
||||||
Status: "pending",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps))
|
|
||||||
r := db.Clauses(clause.OnConflict{
|
|
||||||
DoNothing: true,
|
|
||||||
}).Create(&nsteps)
|
|
||||||
if db.Error != nil {
|
|
||||||
return db.Error
|
|
||||||
}
|
|
||||||
if r.RowsAffected == int64(len(nsteps)) {
|
|
||||||
stepInserted = true
|
|
||||||
}
|
|
||||||
logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted)
|
|
||||||
return db.Error
|
|
||||||
})
|
|
||||||
common.PanicIfError(err)
|
|
||||||
if !stepInserted {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ProcessCommitedSaga(gid string) (rerr error) {
|
|
||||||
steps := []SagaStepModel{}
|
|
||||||
db := DbGet()
|
|
||||||
db1 := db.Order("id asc").Find(&steps)
|
|
||||||
if db1.Error != nil {
|
|
||||||
return db1.Error
|
|
||||||
}
|
|
||||||
checkAffected := func(db1 *gorm.DB) {
|
|
||||||
common.PanicIfError(db1.Error)
|
|
||||||
if db1.RowsAffected == 0 {
|
|
||||||
panic(fmt.Errorf("duplicate updating"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
current := 0 // 当前正在处理的步骤
|
|
||||||
for ; current < len(steps); current++ {
|
|
||||||
step := steps[current]
|
|
||||||
if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if step.Type == "action" && step.Status == "pending" {
|
|
||||||
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
body := resp.String()
|
|
||||||
if strings.Contains(body, "SUCCESS") {
|
|
||||||
writeTransLog(gid, "step finished", "finished", step.Step, "")
|
|
||||||
dbr := db.Model(&step).Where("status=?", "pending").Updates(M{
|
|
||||||
"status": "finished",
|
|
||||||
"finish_time": time.Now(),
|
|
||||||
})
|
|
||||||
checkAffected(dbr)
|
|
||||||
} else if strings.Contains(body, "FAIL") {
|
|
||||||
writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "")
|
|
||||||
dbr := db.Model(&step).Where("status=?", "pending").Updates(M{
|
|
||||||
"status": "rollbacked",
|
|
||||||
"rollback_time": time.Now(),
|
|
||||||
})
|
|
||||||
checkAffected(dbr)
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("unknown response: %s, will be retried", body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if current == len(steps) { // saga 事务完成
|
|
||||||
writeTransLog(gid, "saga finished", "finished", -1, "")
|
|
||||||
dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{
|
|
||||||
"status": "finished",
|
|
||||||
"finish_time": time.Now(),
|
|
||||||
})
|
|
||||||
checkAffected(dbr)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
for current = current - 1; current >= 0; current-- {
|
|
||||||
step := steps[current]
|
|
||||||
if step.Type != "compensate" || step.Status != "pending" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
body := resp.String()
|
|
||||||
if strings.Contains(body, "SUCCESS") {
|
|
||||||
writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "")
|
|
||||||
dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{
|
|
||||||
"status": "rollbacked",
|
|
||||||
"rollback_time": time.Now(),
|
|
||||||
})
|
|
||||||
checkAffected(dbr)
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("expect compensate return SUCCESS")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if current != -1 {
|
|
||||||
return fmt.Errorf("saga current not -1")
|
|
||||||
}
|
|
||||||
writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "")
|
|
||||||
dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{
|
|
||||||
"status": "rollbacked",
|
|
||||||
"rollback_time": time.Now(),
|
|
||||||
})
|
|
||||||
checkAffected(dbr)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartConsumePreparedMsg(consumers int) {
|
|
||||||
logrus.Printf("start to consume prepared msg")
|
|
||||||
r := RabbitmqGet()
|
|
||||||
for i := 0; i < consumers; i++ {
|
|
||||||
go func() {
|
|
||||||
que := r.QueueNew(RabbitmqConstPrepared)
|
|
||||||
que.WaitAndHandle(HandlePreparedMsg)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartConsumeCommitedMsg(consumers int) {
|
|
||||||
logrus.Printf("start to consume commited msg")
|
|
||||||
r := RabbitmqGet()
|
|
||||||
for i := 0; i < consumers; i++ {
|
|
||||||
go func() {
|
|
||||||
que := r.QueueNew(RabbitmqConstCommited)
|
|
||||||
que.WaitAndHandle(HandleCommitedMsg)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -29,14 +29,8 @@ func CronPreparedOnce(expire time.Duration) {
|
|||||||
dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled")
|
dbr = db.Model(&sm).Where("status = ?", "prepared").Update("status", "canceled")
|
||||||
common.PanicIfError(dbr.Error)
|
common.PanicIfError(dbr.Error)
|
||||||
} else if strings.Contains(body, "SUCCESS") {
|
} else if strings.Contains(body, "SUCCESS") {
|
||||||
m := M{}
|
saveCommitedSagaModel(&sm)
|
||||||
steps := []M{}
|
go ProcessCommitedSaga(sm.Gid)
|
||||||
common.MustRemarshal(sm, &m)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
common.MustUnmarshalString(m["steps"].(string), &steps)
|
|
||||||
m["steps"] = steps
|
|
||||||
err = rabbit.SendAndConfirm(RabbitmqConstCommited, m)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -60,7 +54,7 @@ func CronCommitedOnce(expire time.Duration) {
|
|||||||
writeTransLog(sm.Gid, "saga touch commited", "", -1, "")
|
writeTransLog(sm.Gid, "saga touch commited", "", -1, "")
|
||||||
dbr = db.Model(&sm).Update("id", sm.ID)
|
dbr = db.Model(&sm).Update("id", sm.ID)
|
||||||
common.PanicIfError(dbr.Error)
|
common.PanicIfError(dbr.Error)
|
||||||
ProcessCommitedSaga(sm.Gid)
|
go ProcessCommitedSaga(sm.Gid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -13,16 +13,6 @@ import (
|
|||||||
|
|
||||||
type M = map[string]interface{}
|
type M = map[string]interface{}
|
||||||
|
|
||||||
var rabbit *Rabbitmq = nil
|
|
||||||
|
|
||||||
func RabbitmqGet() *Rabbitmq {
|
|
||||||
LoadConfig()
|
|
||||||
if rabbit == nil {
|
|
||||||
rabbit = RabbitmqNew(&ServerConfig.Rabbitmq)
|
|
||||||
}
|
|
||||||
return rabbit
|
|
||||||
}
|
|
||||||
|
|
||||||
var db *gorm.DB = nil
|
var db *gorm.DB = nil
|
||||||
|
|
||||||
func DbGet() *gorm.DB {
|
func DbGet() *gorm.DB {
|
||||||
|
|||||||
@ -1,189 +0,0 @@
|
|||||||
package dtmsvr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/streadway/amqp"
|
|
||||||
"github.com/yedf/dtm/common"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Rabbitmq struct {
|
|
||||||
Config RabbitmqConfig
|
|
||||||
ChannelPool *sync.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitmqConfig struct {
|
|
||||||
Host string
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
Vhost string
|
|
||||||
Exchange string
|
|
||||||
KeyPrepared string
|
|
||||||
KeyCommited string
|
|
||||||
QueuePrepared string
|
|
||||||
QueueCommited string
|
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitmqChannel struct {
|
|
||||||
Confirms chan amqp.Confirmation
|
|
||||||
Channel *amqp.Channel
|
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitmqConst string
|
|
||||||
|
|
||||||
const (
|
|
||||||
RabbitmqConstPrepared RabbitmqConst = "dtm_prepared"
|
|
||||||
RabbitmqConstCommited RabbitmqConst = "dtm_commited"
|
|
||||||
)
|
|
||||||
|
|
||||||
var IgnoreMsgBefore = time.Now().Add(-3 * time.Second) // 忽略3秒前的消息
|
|
||||||
|
|
||||||
func RabbitmqNew(conf *RabbitmqConfig) *Rabbitmq {
|
|
||||||
return &Rabbitmq{
|
|
||||||
Config: *conf,
|
|
||||||
ChannelPool: &sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
channel := newChannel(conf)
|
|
||||||
err := channel.Confirm(false)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 2))
|
|
||||||
return &RabbitmqChannel{
|
|
||||||
Channel: channel,
|
|
||||||
Confirms: confirms,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newChannel(conf *RabbitmqConfig) *amqp.Channel {
|
|
||||||
uri := fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Username, conf.Password, conf.Host, conf.Vhost)
|
|
||||||
logrus.Printf("connecting rabbitmq: %s", uri)
|
|
||||||
conn, err := amqp.Dial(uri)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
channel, err := conn.Channel()
|
|
||||||
common.PanicIfError(err)
|
|
||||||
err = channel.ExchangeDeclare(
|
|
||||||
conf.Exchange, // exchange name
|
|
||||||
"direct", // exchange type
|
|
||||||
true, // durable
|
|
||||||
false, // autoDelete
|
|
||||||
false, // internal
|
|
||||||
false, // noWait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
return channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rabbitmq) SendAndConfirm(key RabbitmqConst, data map[string]interface{}) error {
|
|
||||||
body, err := json.Marshal(data)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
channel := r.ChannelPool.Get().(*RabbitmqChannel)
|
|
||||||
|
|
||||||
logrus.Printf("publishing %s %v", key, data)
|
|
||||||
err = channel.Channel.Publish(
|
|
||||||
r.Config.Exchange,
|
|
||||||
common.If(key == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string),
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
amqp.Publishing{
|
|
||||||
ContentType: "application/json",
|
|
||||||
DeliveryMode: amqp.Persistent,
|
|
||||||
Body: body,
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
confirm := <-channel.Confirms
|
|
||||||
r.ChannelPool.Put(channel)
|
|
||||||
logrus.Printf("confirmed %t for %s", confirm.Ack, data["gid"])
|
|
||||||
if !confirm.Ack {
|
|
||||||
return fmt.Errorf("confirm not ok for %s", data["gid"])
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitmqQueue struct {
|
|
||||||
Name string
|
|
||||||
Queue *amqp.Queue
|
|
||||||
Channel *amqp.Channel
|
|
||||||
Conn *amqp.Connection
|
|
||||||
Deliveries <-chan amqp.Delivery
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitmqQueue) Close() {
|
|
||||||
q.Channel.Close()
|
|
||||||
// q.Conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitmqQueue) WaitAndHandle(handler func(data M)) {
|
|
||||||
for {
|
|
||||||
q.WaitAndHandleOne(handler)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func (q *RabbitmqQueue) WaitAndHandleOne(handler func(data M)) {
|
|
||||||
logrus.Printf("%s reading message", q.Name)
|
|
||||||
msg := <-q.Deliveries
|
|
||||||
for msg.Timestamp.Before(IgnoreMsgBefore) {
|
|
||||||
logrus.Printf("%s discarding a message %v before %v", q.Name, msg.Timestamp, IgnoreMsgBefore)
|
|
||||||
msg.Ack(false)
|
|
||||||
msg = <-q.Deliveries
|
|
||||||
}
|
|
||||||
data := map[string]interface{}{}
|
|
||||||
err := json.Unmarshal(msg.Body, &data)
|
|
||||||
logrus.Printf("%s handling one message: %v", q.Name, data)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
handler(data)
|
|
||||||
err = msg.Ack(false)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
logrus.Printf("%s acked msg: %d", q.Name, msg.DeliveryTag)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rabbitmq) QueueNew(queueType RabbitmqConst) *RabbitmqQueue {
|
|
||||||
channel := newChannel(&r.Config)
|
|
||||||
queueName := common.If(queueType == RabbitmqConstPrepared, r.Config.QueuePrepared, r.Config.QueueCommited).(string)
|
|
||||||
queue, err := channel.QueueDeclare(
|
|
||||||
queueName, // name of the queue
|
|
||||||
true, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // noWait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
logrus.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange",
|
|
||||||
queue.Name, queue.Messages, queue.Consumers)
|
|
||||||
err = channel.QueueBind(
|
|
||||||
queue.Name, // name of the queue
|
|
||||||
common.If(queueType == RabbitmqConstPrepared, r.Config.KeyPrepared, r.Config.KeyCommited).(string), // bindingKey
|
|
||||||
r.Config.Exchange, // sourceExchange
|
|
||||||
false, // noWait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
deliveries, err := channel.Consume(
|
|
||||||
queue.Name, // name
|
|
||||||
"simple-consumer", // consumerTag,
|
|
||||||
false, // noAck
|
|
||||||
false, // exclusive
|
|
||||||
false, // noLocal
|
|
||||||
false, // noWait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
return &RabbitmqQueue{
|
|
||||||
Queue: &queue,
|
|
||||||
Name: queueName,
|
|
||||||
Channel: channel,
|
|
||||||
Deliveries: deliveries,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rabbitmq) HandleMsg(data interface{}) {
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,30 +0,0 @@
|
|||||||
package dtmsvr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/magiconair/properties/assert"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/yedf/dtm/common"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
LoadConfig()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRabbitConfig(t *testing.T) {
|
|
||||||
assert.Matches(t, ServerConfig.Rabbitmq.KeyCommited, "key_committed")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRabbitmq1Msg(t *testing.T) {
|
|
||||||
rb := RabbitmqNew(&ServerConfig.Rabbitmq)
|
|
||||||
err := rb.SendAndConfirm(RabbitmqConstPrepared, M{
|
|
||||||
"gid": common.GenGid(),
|
|
||||||
})
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
queue := rb.QueueNew(RabbitmqConstPrepared)
|
|
||||||
queue.WaitAndHandle(func(data M) {
|
|
||||||
logrus.Printf("processed msg: %v in queue1", data)
|
|
||||||
})
|
|
||||||
assert.Equal(t, 0, 1)
|
|
||||||
}
|
|
||||||
@ -1,5 +0,0 @@
|
|||||||
package dtmsvr
|
|
||||||
|
|
||||||
func startScanPrepared() {
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,8 +1,16 @@
|
|||||||
package dtmsvr
|
package dtmsvr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yedf/dtm/common"
|
"github.com/yedf/dtm/common"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
|
|
||||||
func AddRoute(engine *gin.Engine) {
|
func AddRoute(engine *gin.Engine) {
|
||||||
@ -10,26 +18,195 @@ func AddRoute(engine *gin.Engine) {
|
|||||||
engine.POST("/api/dtmsvr/commit", Commit)
|
engine.POST("/api/dtmsvr/commit", Commit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Prepare(c *gin.Context) {
|
func getSagaModelFromContext(c *gin.Context) *SagaModel {
|
||||||
data := M{}
|
data := M{}
|
||||||
err := c.BindJSON(&data)
|
b, err := c.GetRawData()
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rabbit := RabbitmqGet()
|
|
||||||
err = rabbit.SendAndConfirm(RabbitmqConstPrepared, data)
|
|
||||||
common.PanicIfError(err)
|
common.PanicIfError(err)
|
||||||
|
common.MustUnmarshal(b, &data)
|
||||||
|
logrus.Printf("creating saga model in prepare")
|
||||||
|
data["steps"] = common.MustMarshalString(data["steps"])
|
||||||
|
m := SagaModel{}
|
||||||
|
common.MustRemarshal(data, &m)
|
||||||
|
return &m
|
||||||
|
}
|
||||||
|
|
||||||
|
func Prepare(c *gin.Context) {
|
||||||
|
db := DbGet()
|
||||||
|
m := getSagaModelFromContext(c)
|
||||||
|
m.Status = "prepared"
|
||||||
|
writeTransLog(m.Gid, "save prepared", m.Status, -1, m.Steps)
|
||||||
|
db1 := db.Clauses(clause.OnConflict{
|
||||||
|
DoNothing: true,
|
||||||
|
}).Create(&m)
|
||||||
|
common.PanicIfError(db1.Error)
|
||||||
c.JSON(200, M{"message": "SUCCESS"})
|
c.JSON(200, M{"message": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Commit(c *gin.Context) {
|
func Commit(c *gin.Context) {
|
||||||
data := M{}
|
m := getSagaModelFromContext(c)
|
||||||
err := c.BindJSON(&data)
|
saveCommitedSagaModel(m)
|
||||||
if err != nil {
|
go ProcessCommitedSaga(m.Gid)
|
||||||
return
|
|
||||||
}
|
|
||||||
rabbit := RabbitmqGet()
|
|
||||||
err = rabbit.SendAndConfirm(RabbitmqConstCommited, data)
|
|
||||||
common.PanicIfError(err)
|
|
||||||
c.JSON(200, M{"message": "SUCCESS"})
|
c.JSON(200, M{"message": "SUCCESS"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func saveCommitedSagaModel(m *SagaModel) {
|
||||||
|
db := DbGet()
|
||||||
|
m.Status = "commited"
|
||||||
|
stepInserted := false
|
||||||
|
err := db.Transaction(func(db *gorm.DB) error {
|
||||||
|
writeTransLog(m.Gid, "save commited", m.Status, -1, m.Steps)
|
||||||
|
dbr := db.Clauses(clause.OnConflict{
|
||||||
|
DoNothing: true,
|
||||||
|
}).Create(&m)
|
||||||
|
if dbr.Error == nil && dbr.RowsAffected == 0 {
|
||||||
|
writeTransLog(m.Gid, "change status", m.Status, -1, "")
|
||||||
|
dbr = db.Model(&m).Where("status=?", "prepared").Update("status", "commited")
|
||||||
|
}
|
||||||
|
common.PanicIfError(dbr.Error)
|
||||||
|
nsteps := []SagaStepModel{}
|
||||||
|
steps := []M{}
|
||||||
|
err := json.Unmarshal([]byte(m.Steps), &steps)
|
||||||
|
common.PanicIfError(err)
|
||||||
|
for _, step := range steps {
|
||||||
|
nsteps = append(nsteps, SagaStepModel{
|
||||||
|
Gid: m.Gid,
|
||||||
|
Step: len(nsteps) + 1,
|
||||||
|
Data: step["post_data"].(string),
|
||||||
|
Url: step["compensate"].(string),
|
||||||
|
Type: "compensate",
|
||||||
|
Status: "pending",
|
||||||
|
})
|
||||||
|
nsteps = append(nsteps, SagaStepModel{
|
||||||
|
Gid: m.Gid,
|
||||||
|
Step: len(nsteps) + 1,
|
||||||
|
Data: step["post_data"].(string),
|
||||||
|
Url: step["action"].(string),
|
||||||
|
Type: "action",
|
||||||
|
Status: "pending",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
writeTransLog(m.Gid, "save steps", m.Status, -1, common.MustMarshalString(nsteps))
|
||||||
|
r := db.Clauses(clause.OnConflict{
|
||||||
|
DoNothing: true,
|
||||||
|
}).Create(&nsteps)
|
||||||
|
if db.Error != nil {
|
||||||
|
return db.Error
|
||||||
|
}
|
||||||
|
if r.RowsAffected == int64(len(nsteps)) {
|
||||||
|
stepInserted = true
|
||||||
|
}
|
||||||
|
logrus.Printf("rows affected: %d nsteps length: %d, stepInersted: %t", r.RowsAffected, int64(len(nsteps)), stepInserted)
|
||||||
|
return db.Error
|
||||||
|
})
|
||||||
|
common.PanicIfError(err)
|
||||||
|
if !stepInserted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var SagaProcessedTestChan chan string = nil // 用于测试时,通知处理结束
|
||||||
|
|
||||||
|
func WaitCommitedSaga(gid string) {
|
||||||
|
id := <-SagaProcessedTestChan
|
||||||
|
for id != gid {
|
||||||
|
logrus.Errorf("-------id %s not match gid %s", id, gid)
|
||||||
|
id = <-SagaProcessedTestChan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProcessCommitedSaga(gid string) {
|
||||||
|
err := innerProcessCommitedSaga(gid)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("process commited saga error: %s", err.Error())
|
||||||
|
}
|
||||||
|
if SagaProcessedTestChan != nil {
|
||||||
|
SagaProcessedTestChan <- gid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func innerProcessCommitedSaga(gid string) (rerr error) {
|
||||||
|
steps := []SagaStepModel{}
|
||||||
|
db := DbGet()
|
||||||
|
db1 := db.Order("id asc").Find(&steps)
|
||||||
|
if db1.Error != nil {
|
||||||
|
return db1.Error
|
||||||
|
}
|
||||||
|
checkAffected := func(db1 *gorm.DB) {
|
||||||
|
common.PanicIfError(db1.Error)
|
||||||
|
if db1.RowsAffected == 0 {
|
||||||
|
panic(fmt.Errorf("duplicate updating"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
current := 0 // 当前正在处理的步骤
|
||||||
|
for ; current < len(steps); current++ {
|
||||||
|
step := steps[current]
|
||||||
|
if step.Type == "compensate" && step.Status == "pending" || step.Type == "action" && step.Status == "finished" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if step.Type == "action" && step.Status == "pending" {
|
||||||
|
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body := resp.String()
|
||||||
|
if strings.Contains(body, "SUCCESS") {
|
||||||
|
writeTransLog(gid, "step finished", "finished", step.Step, "")
|
||||||
|
dbr := db.Model(&step).Where("status=?", "pending").Updates(M{
|
||||||
|
"status": "finished",
|
||||||
|
"finish_time": time.Now(),
|
||||||
|
})
|
||||||
|
checkAffected(dbr)
|
||||||
|
} else if strings.Contains(body, "FAIL") {
|
||||||
|
writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "")
|
||||||
|
dbr := db.Model(&step).Where("status=?", "pending").Updates(M{
|
||||||
|
"status": "rollbacked",
|
||||||
|
"rollback_time": time.Now(),
|
||||||
|
})
|
||||||
|
checkAffected(dbr)
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("unknown response: %s, will be retried", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if current == len(steps) { // saga 事务完成
|
||||||
|
writeTransLog(gid, "saga finished", "finished", -1, "")
|
||||||
|
dbr := db.Model(&SagaModel{}).Where("gid=? and status=?", gid, "commited").Updates(M{
|
||||||
|
"status": "finished",
|
||||||
|
"finish_time": time.Now(),
|
||||||
|
})
|
||||||
|
checkAffected(dbr)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for current = current - 1; current >= 0; current-- {
|
||||||
|
step := steps[current]
|
||||||
|
if step.Type != "compensate" || step.Status != "pending" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp, err := common.RestyClient.R().SetBody(step.Data).SetQueryParam("gid", step.Gid).Post(step.Url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body := resp.String()
|
||||||
|
if strings.Contains(body, "SUCCESS") {
|
||||||
|
writeTransLog(gid, "step rollbacked", "rollbacked", step.Step, "")
|
||||||
|
dbr := db.Model(&step).Where("status=?", step.Status).Updates(M{
|
||||||
|
"status": "rollbacked",
|
||||||
|
"rollback_time": time.Now(),
|
||||||
|
})
|
||||||
|
checkAffected(dbr)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("expect compensate return SUCCESS")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if current != -1 {
|
||||||
|
return fmt.Errorf("saga current not -1")
|
||||||
|
}
|
||||||
|
writeTransLog(gid, "saga rollbacked", "rollbacked", -1, "")
|
||||||
|
dbr := db.Model(&SagaModel{}).Where("status=? and gid=?", "commited", gid).Updates(M{
|
||||||
|
"status": "rollbacked",
|
||||||
|
"rollback_time": time.Now(),
|
||||||
|
})
|
||||||
|
checkAffected(dbr)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -6,8 +6,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Main() {
|
func Main() {
|
||||||
StartConsumePreparedMsg(1)
|
|
||||||
StartConsumeCommitedMsg(1)
|
|
||||||
logrus.Printf("dtmsvr listen at: 8080")
|
logrus.Printf("dtmsvr listen at: 8080")
|
||||||
go StartSvr()
|
go StartSvr()
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user