fix saga aborting error
This commit is contained in:
parent
70ca1e30eb
commit
9e95e90b6c
@ -138,7 +138,7 @@ func (t *TransGlobal) getBranchParams(branch *TransBranch) common.MS {
|
|||||||
|
|
||||||
func (t *TransGlobal) setNextCron(expireIn int64) []string {
|
func (t *TransGlobal) setNextCron(expireIn int64) []string {
|
||||||
t.NextCronInterval = expireIn
|
t.NextCronInterval = expireIn
|
||||||
next := time.Now().Add(time.Duration(config.TransCronInterval) * time.Second)
|
next := time.Now().Add(time.Duration(t.NextCronInterval) * time.Second)
|
||||||
t.NextCronTime = &next
|
t.NextCronTime = &next
|
||||||
return []string{"next_cron_interval", "next_cron_time"}
|
return []string{"next_cron_interval", "next_cron_time"}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,16 +51,19 @@ func (t *transSagaProcessor) ExecBranch(db *common.DB, branch *TransBranch) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
|
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
|
||||||
if t.Status != "submitted" {
|
if t.Status == "failed" || t.Status == "succeed" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
current := 0 // 当前正在处理的步骤
|
current := 0 // 当前正在处理的步骤
|
||||||
for ; current < len(branches); current++ {
|
for ; current < len(branches); current++ {
|
||||||
branch := &branches[current]
|
branch := &branches[current]
|
||||||
if branch.BranchType != "action" || branch.Status != "prepared" {
|
if branch.BranchType != "action" || branch.Status == "succeed" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
t.ExecBranch(db, branch)
|
// 找到了一个非succeed的action
|
||||||
|
if branch.Status == "prepared" {
|
||||||
|
t.ExecBranch(db, branch)
|
||||||
|
}
|
||||||
if branch.Status != "succeed" {
|
if branch.Status != "succeed" {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -79,8 +82,5 @@ func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch)
|
|||||||
}
|
}
|
||||||
t.ExecBranch(db, branch)
|
t.ExecBranch(db, branch)
|
||||||
}
|
}
|
||||||
if current != -1 {
|
t.changeStatus(db, "failed")
|
||||||
panic(fmt.Errorf("saga current not -1"))
|
|
||||||
}
|
|
||||||
t.changeStatus(db.Must(), "failed")
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,11 +39,10 @@ func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
branchType := common.If(t.Status == "submitted", "confirm", "cancel").(string)
|
branchType := common.If(t.Status == "submitted", "confirm", "cancel").(string)
|
||||||
for current := len(branches) - 1; current >= -1; current-- {
|
for current := len(branches) - 1; current >= 0; current-- {
|
||||||
if current == -1 { // 已全部处理完
|
if branches[current].BranchType == branchType && branches[current].Status == "prepared" {
|
||||||
t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string))
|
|
||||||
} else if branches[current].BranchType == branchType {
|
|
||||||
t.ExecBranch(db, &branches[current])
|
t.ExecBranch(db, &branches[current])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
t.changeStatus(db, common.If(t.Status == "submitted", "succeed", "failed").(string))
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user