dtm/dtmsvr/trans_msg.go
2021-06-30 19:17:32 +08:00

92 lines
2.4 KiB
Go

package dtmsvr
import (
"fmt"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
type TransMsgProcessor struct {
*TransGlobal
}
func init() {
registorProcessorCreator("msg", func(trans *TransGlobal) TransProcessor { return &TransMsgProcessor{TransGlobal: trans} })
}
func (t *TransMsgProcessor) GenBranches() []TransBranch {
branches := []TransBranch{}
steps := []M{}
common.MustUnmarshalString(t.Data, &steps)
for _, step := range steps {
branches = append(branches, TransBranch{
Gid: t.Gid,
Branch: common.GenGid(),
Data: step["data"].(string),
Url: step["action"].(string),
BranchType: "action",
Status: "prepared",
})
}
return branches
}
func (t *TransMsgProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
resp, err := common.RestyClient.R().SetBody(branch.Data).SetQueryParam("gid", branch.Gid).Post(branch.Url)
e2p(err)
body := resp.String()
if strings.Contains(body, "SUCCESS") {
branch.changeStatus(db, "succeed")
t.touch(db, config.TransCronInterval)
} else {
panic(fmt.Errorf("unknown response: %s, will be retried", body))
}
}
func (t *TransGlobal) mayQueryPrepared(db *common.DB) {
if t.Status != "prepared" {
return
}
resp, err := common.RestyClient.R().SetQueryParam("gid", t.Gid).Get(t.QueryPrepared)
e2p(err)
body := resp.String()
if strings.Contains(body, "FAIL") {
preparedExpire := time.Now().Add(time.Duration(-config.PreparedExpire) * time.Second)
logrus.Printf("create time: %s prepared expire: %s ", t.CreateTime.Local(), preparedExpire.Local())
status := common.If(t.CreateTime.Before(preparedExpire), "canceled", "prepared").(string)
if status != t.Status {
t.changeStatus(db, status)
} else {
t.touch(db, t.NextCronInterval*2)
}
} else if strings.Contains(body, "SUCCESS") {
t.changeStatus(db, "committed")
}
}
func (t *TransMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
t.mayQueryPrepared(db)
if t.Status != "committed" {
return
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
if branch.BranchType != "action" || branch.Status != "prepared" {
continue
}
t.ExecBranch(db, branch)
if branch.Status != "succeed" {
break
}
}
if current == len(branches) { // msg 事务完成
t.changeStatus(db, "succeed")
return
}
panic("msg go pass all branch")
}