add barrier

This commit is contained in:
yedongfu 2021-07-03 11:44:01 +08:00
parent 2fbbc5efc1
commit b45838187d
17 changed files with 588 additions and 309 deletions

1
.gitignore vendored
View File

@ -3,4 +3,3 @@ conf.yml*
*.out
*/**/main
main
intra-barrier.md

View File

@ -12,27 +12,38 @@ import (
type M = map[string]interface{}
func main() {
if os.Args[1] == "quick_start" {
dtmsvr.Main()
examples.StartMain()
for {
time.Sleep(1000 * time.Second)
}
}
app := examples.BaseAppNew()
examples.BaseAppSetup(app)
if len(os.Args) == 1 || os.Args[1] == "saga" { // 默认情况下展示saga例子
dtmsvr.PopulateMysql()
go dtmsvr.Main()
go examples.SagaStartSvr()
time.Sleep(100 * time.Millisecond)
dtmsvr.Main()
examples.SagaSetup(app)
examples.BaseAppStart(app)
examples.SagaFireRequest()
} else if os.Args[1] == "xa" { // 启动xa示例
dtmsvr.PopulateMysql()
go dtmsvr.StartSvr()
dtmsvr.Main()
examples.PopulateMysql()
examples.XaMain()
examples.XaSetup(app)
examples.BaseAppStart(app)
examples.XaFireRequest()
} else if os.Args[1] == "dtmsvr" { // 只启动dtmsvr
go dtmsvr.StartSvr()
} else if os.Args[1] == "all" { // 运行所有示例
dtmsvr.PopulateMysql()
examples.PopulateMysql()
go dtmsvr.Main()
go examples.SagaStartSvr()
go examples.TccStartSvr()
go examples.XaStartSvr()
time.Sleep(100 * time.Millisecond)
dtmsvr.Main()
examples.SagaSetup(app)
examples.TccSetup(app)
examples.XaSetup(app)
examples.BaseAppStart(app)
examples.SagaFireRequest()
examples.TccFireRequest()
examples.XaFireRequest()

69
barrier.go Normal file
View File

@ -0,0 +1,69 @@
package dtm
import (
"context"
"database/sql"
"fmt"
"github.com/yedf/dtm/common"
)
type BusiFunc func(db *sql.DB) (interface{}, error)
type TransInfo struct {
TransType string
Gid string
BranchID string
BranchType string
}
func (t *TransInfo) String() string {
return fmt.Sprintf("transInfo: %s %s %s %s", t.TransType, t.Gid, t.BranchID, t.BranchType)
}
type BarrierModel struct {
common.ModelBase
TransInfo
}
func (BarrierModel) TableName() string { return "dtm_barrier.barrier" }
func insertBarrier(tx *sql.Tx, transType string, gid string, branchID string, branchType string) (int64, error) {
if branchType == "" {
return 0, nil
}
res, err := tx.Exec("insert into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values(?,?,?,?)", transType, gid, branchID, branchType)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func ThroughBarrierCall(db *sql.DB, transType string, gid string, branchId string, branchType string, busiCall BusiFunc) (res interface{}, rerr error) {
tx, rerr := db.BeginTx(context.Background(), &sql.TxOptions{})
if rerr != nil {
return
}
defer func() {
if x := recover(); x != nil {
tx.Rollback()
panic(x)
} else if rerr != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
originType := map[string]string{
"cancel": "action",
"compensate": "action",
}[branchType]
originAffected, _ := insertBarrier(tx, transType, gid, branchId, originType)
currentAffected, rerr := insertBarrier(tx, transType, gid, branchId, branchType)
if currentAffected == 0 || (originType == "cancel" || originType == "compensate") && originAffected > 0 {
return
}
res, rerr = busiCall(db)
return
}

View File

@ -33,6 +33,12 @@ func (m *DB) NoMust() *DB {
return &DB{DB: db}
}
func (m *DB) ToSqlDB() *sql.DB {
d, err := m.DB.DB()
E2P(err)
return d
}
type tracePlugin struct{}
func (op *tracePlugin) Name() string {
@ -100,6 +106,14 @@ func DbGet(conf map[string]string) *DB {
return dbs[dsn]
}
func SqlDB2DB(sdb *sql.DB) *DB {
db, err := gorm.Open(mysql.New(mysql.Config{
Conn: sdb,
}), &gorm.Config{})
E2P(err)
return &DB{DB: db}
}
type MyConn struct {
Conn *sql.DB
Dsn string

View File

@ -266,3 +266,8 @@ func GetProjectDir() string {
}
return file
}
func GetFuncName() string {
pc, _, _, _ := runtime.Caller(1)
return runtime.FuncForPC(pc).Name()
}

View File

@ -1,12 +1,14 @@
package dtmsvr
import (
"database/sql"
"fmt"
"testing"
"time"
"github.com/go-playground/assert/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/examples"
@ -15,6 +17,8 @@ import (
var myinit int = func() int {
common.InitApp(common.GetProjectDir(), &config)
config.Mysql["database"] = dbName
PopulateMysql()
examples.PopulateMysql()
return 0
}()
@ -25,15 +29,15 @@ func TestViper(t *testing.T) {
func TestDtmSvr(t *testing.T) {
TransProcessedTestChan = make(chan string, 1)
PopulateMysql()
examples.PopulateMysql()
// 启动组件
go StartSvr()
go examples.SagaStartSvr()
go examples.XaStartSvr()
go examples.TccStartSvr()
go examples.MsgStartSvr()
time.Sleep(time.Duration(200 * 1000 * 1000))
app := examples.BaseAppNew()
examples.BaseAppSetup(app)
examples.SagaSetup(app)
examples.TccSetup(app)
examples.XaSetup(app)
examples.MsgSetup(app)
examples.BaseAppStart(app)
// 清理数据
e2p(dbGet().Exec("truncate trans_global").Error)
@ -89,12 +93,12 @@ func xaNormal(t *testing.T) {
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "1",
}).Post(examples.XaBusi + "/TransOut")
}).Post(examples.Busi + "/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "2",
}).Post(examples.XaBusi + "/TransIn")
}).Post(examples.Busi + "/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
@ -111,12 +115,12 @@ func xaRollback(t *testing.T) {
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "1",
}).Post(examples.XaBusi + "/TransOut")
}).Post(examples.Busi + "/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "2",
}).Post(examples.XaBusi + "/TransIn")
}).Post(examples.Busi + "/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
@ -143,11 +147,10 @@ func tccRollback(t *testing.T) {
}
func tccRollbackPending(t *testing.T) {
tcc := genTcc("gid-tcc-rollback-pending", false, true)
examples.TccTransInCancelResult = "PENDING"
examples.MainSwitch.TransInRevertResult.SetOnce("PENDING")
tcc.Commit()
WaitTransProcessed(tcc.Gid)
assert.Equal(t, "committed", getTransStatus(tcc.Gid))
examples.TccTransInCancelResult = ""
// assert.Equal(t, "committed", getTransStatus(tcc.Gid))
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"succeed", "prepared", "succeed", "succeed", "prepared", "failed"}, getBranchesStatus(tcc.Gid))
}
@ -165,14 +168,12 @@ func msgPending(t *testing.T) {
msg := genMsg("gid-normal-pending")
msg.Prepare("")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = "PENDING"
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")
CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "prepared", getTransStatus(msg.Gid))
examples.MsgTransQueryResult = ""
examples.MsgTransInResult = "PENDING"
examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60*time.Second, "prepared")
assert.Equal(t, "committed", getTransStatus(msg.Gid))
examples.MsgTransInResult = ""
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
assert.Equal(t, "succeed", getTransStatus(msg.Gid))
@ -197,11 +198,10 @@ func sagaRollback(t *testing.T) {
func sagaCommittedPending(t *testing.T) {
saga := genSaga("gid-committedPending", false, false)
examples.SagaTransInResult = "PENDING"
examples.MainSwitch.TransInResult.SetOnce("PENDING")
saga.Commit()
WaitTransProcessed(saga.Gid)
examples.SagaTransInResult = ""
assert.Equal(t, []string{"prepared", "succeed", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
CronTransOnce(60*time.Second, "committed")
assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
assert.Equal(t, "succeed", getTransStatus(saga.Gid))
@ -210,10 +210,10 @@ func sagaCommittedPending(t *testing.T) {
func genMsg(gid string) *dtm.Msg {
logrus.Printf("beginning a msg test ---------------- %s", gid)
msg := dtm.MsgNew(examples.DtmServer)
msg.QueryPrepared = examples.MsgBusi + "/TransQuery"
msg.QueryPrepared = examples.Busi + "/CanSubmit"
req := examples.GenTransReq(30, false, false)
msg.Add(examples.MsgBusi+"/TransOut", &req)
msg.Add(examples.MsgBusi+"/TransIn", &req)
msg.Add(examples.Busi+"/TransOut", &req)
msg.Add(examples.Busi+"/TransIn", &req)
msg.Gid = gid
return msg
}
@ -222,8 +222,8 @@ func genSaga(gid string, outFailed bool, inFailed bool) *dtm.Saga {
logrus.Printf("beginning a saga test ---------------- %s", gid)
saga := dtm.SagaNew(examples.DtmServer)
req := examples.GenTransReq(30, outFailed, inFailed)
saga.Add(examples.SagaBusi+"/TransOut", examples.SagaBusi+"/TransOutCompensate", &req)
saga.Add(examples.SagaBusi+"/TransIn", examples.SagaBusi+"/TransInCompensate", &req)
saga.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutRevert", &req)
saga.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInRevert", &req)
saga.Gid = gid
return saga
}
@ -232,8 +232,8 @@ func genTcc(gid string, outFailed bool, inFailed bool) *dtm.Tcc {
logrus.Printf("beginning a tcc test ---------------- %s", gid)
tcc := dtm.TccNew(examples.DtmServer)
req := examples.GenTransReq(30, outFailed, inFailed)
tcc.Add(examples.TccBusi+"/TransOutTry", examples.TccBusi+"/TransOutConfirm", examples.TccBusi+"/TransOutCancel", &req)
tcc.Add(examples.TccBusi+"/TransInTry", examples.TccBusi+"/TransInConfirm", examples.TccBusi+"/TransInCancel", &req)
tcc.Add(examples.Busi+"/TransOut", examples.Busi+"/TransOutConfirm", examples.Busi+"/TransOutRevert", &req)
tcc.Add(examples.Busi+"/TransIn", examples.Busi+"/TransInConfirm", examples.Busi+"/TransInRevert", &req)
tcc.Gid = gid
return tcc
}
@ -258,3 +258,25 @@ func transQuery(t *testing.T, gid string) {
assert.Equal(t, nil, m["transaction"])
assert.Equal(t, 0, len(m["branches"].([]interface{})))
}
func TestSqlDB(t *testing.T) {
asserts := assert.New(t)
db := common.DbGet(config.Mysql)
db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type) values('saga', 'gid1', 'branch_id1', 'action')")
_, err := dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) {
logrus.Printf("rollback gid2")
return nil, fmt.Errorf("gid2 error")
})
asserts.Error(err, fmt.Errorf("gid2 error"))
dbr := db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid1").Find(&[]dtm.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(1))
dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(0))
_, err = dtm.ThroughBarrierCall(db.ToSqlDB(), "saga", "gid2", "branch_id2", "compensate", func(db *sql.DB) (interface{}, error) {
logrus.Printf("commit gid2")
return nil, nil
})
asserts.Nil(err)
dbr = db.Model(&dtm.BarrierModel{}).Where("gid=?", "gid2").Find(&[]dtm.BarrierModel{})
asserts.Equal(dbr.RowsAffected, int64(2))
}

View File

@ -24,4 +24,21 @@ create table if not exists user_account_trading( -- 表示交易中被冻结的
key(update_time)
);
insert into user_account_trading (user_id, trading_balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE trading_balance=values (trading_balance);
insert into user_account_trading (user_id, trading_balance) values (1, 0), (2, 0) on DUPLICATE KEY UPDATE trading_balance=values (trading_balance);
create database if not exists `dtm_barrier` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;
use dtm_barrier;
drop table if exists barrier;
create table if not exists barrier(
id int(11) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '' ,
gid varchar(128) default'',
branch_id varchar(128) default '',
branch_type varchar(45) default '',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, branch_type)
);

88
examples/main_base.go Normal file
View File

@ -0,0 +1,88 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
)
const (
BusiApi = "/api/busi"
BusiPort = 8081
)
var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiApi)
func BaseAppNew() *gin.Engine {
logrus.Printf("examples starting")
app := common.GetGinApp()
return app
}
func BaseAppStart(app *gin.Engine) {
logrus.Printf("Starting busi at: %d", BusiPort)
go app.Run(fmt.Sprintf(":%d", BusiPort))
time.Sleep(100 * time.Millisecond)
}
type AutoEmptyString struct {
value string
}
func (s *AutoEmptyString) SetOnce(v string) {
s.value = v
}
func (s *AutoEmptyString) Fetch() string {
v := s.value
s.value = ""
return v
}
type mainSwitchType struct {
TransInResult AutoEmptyString
TransOutResult AutoEmptyString
TransInConfirmResult AutoEmptyString
TransOutConfirmResult AutoEmptyString
TransInRevertResult AutoEmptyString
TransOutRevertResult AutoEmptyString
CanSubmitResult AutoEmptyString
}
var MainSwitch mainSwitchType
func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) {
info := infoFromContext(c)
res := common.OrString(MainSwitch.TransInResult.Fetch(), result2, "SUCCESS")
logrus.Printf("%s %s result: %s", info.String(), common.GetFuncName(), res)
return M{"result": res}, nil
}
func BaseAppSetup(app *gin.Engine) {
app.POST(BusiApi+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn")
}))
app.POST(BusiApi+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransOutResult.Fetch(), reqFrom(c).TransOutResult, "transIn")
}))
app.POST(BusiApi+"/TransInConfirm", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransInConfirmResult.Fetch(), "", "transIn")
}))
app.POST(BusiApi+"/TransOutConfirm", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransOutConfirmResult.Fetch(), "", "transIn")
}))
app.POST(BusiApi+"/TransInRevert", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransInRevertResult.Fetch(), "", "transIn")
}))
app.POST(BusiApi+"/TransOutRevert", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return handleGeneralBusiness(c, MainSwitch.TransOutRevertResult.Fetch(), "", "transIn")
}))
app.GET(BusiApi+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
logrus.Printf("%s CanSubmit", c.Query("gid"))
return common.OrString(MainSwitch.CanSubmitResult.Fetch(), "SUCCESS"), nil
}))
}

View File

@ -1,31 +1,22 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const MsgBusiApi = "/api/busi_msg"
var MsgBusi = fmt.Sprintf("http://localhost:%d%s", MsgBusiPort, MsgBusiApi)
func MsgMain() {
go MsgStartSvr()
app := BaseAppNew()
BaseAppSetup(app)
BaseAppStart(app)
MsgFireRequest()
time.Sleep(1000 * time.Second)
}
func MsgStartSvr() {
logrus.Printf("msg examples starting")
app := common.GetGinApp()
MsgAddRoute(app)
app.Run(fmt.Sprintf(":%d", MsgBusiPort))
func MsgSetup(app *gin.Engine) {
}
func MsgFireRequest() {
@ -36,47 +27,11 @@ func MsgFireRequest() {
TransOutResult: "SUCCESS",
}
msg := dtm.MsgNew(DtmServer).
Add(MsgBusi+"/TransOut", req).
Add(MsgBusi+"/TransIn", req)
err := msg.Prepare(MsgBusi + "/TransQuery")
Add(Busi+"/TransOut", req).
Add(Busi+"/TransIn", req)
err := msg.Prepare(Busi + "/TransQuery")
e2p(err)
logrus.Printf("busi trans commit")
err = msg.Commit()
e2p(err)
}
// api
func MsgAddRoute(app *gin.Engine) {
app.POST(MsgBusiApi+"/TransIn", common.WrapHandler(msgTransIn))
app.POST(MsgBusiApi+"/TransOut", common.WrapHandler(MsgTransOut))
app.GET(MsgBusiApi+"/TransQuery", common.WrapHandler(msgTransQuery))
logrus.Printf("examples msg listening at %d", MsgBusiPort)
}
var MsgTransInResult = ""
var MsgTransOutResult = ""
var MsgTransQueryResult = ""
func msgTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(MsgTransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func MsgTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(MsgTransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func msgTransQuery(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
logrus.Printf("%s TransQuery", gid)
res := common.OrString(MsgTransQueryResult, "SUCCESS")
return M{"result": res}, nil
}

View File

@ -1,92 +1,38 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const SagaBusiApi = "/api/busi_saga"
var SagaBusi = fmt.Sprintf("http://localhost:%d%s", SagaBusiPort, SagaBusiApi)
func SagaMain() {
go SagaStartSvr()
SagaFireRequest()
app := BaseAppNew()
BaseAppSetup(app)
TccSetup(app)
go BaseAppStart(app)
time.Sleep(100 * time.Millisecond)
TccFireRequest()
time.Sleep(1000 * time.Second)
}
func SagaStartSvr() {
logrus.Printf("saga examples starting")
app := common.GetGinApp()
SagaAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBusiPort))
func SagaSetup(app *gin.Engine) {
}
func SagaFireRequest() {
logrus.Printf("a busi transaction begin")
logrus.Printf("a saga busi transaction begin")
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer).
Add(SagaBusi+"/TransOut", SagaBusi+"/TransOutCompensate", req).
Add(SagaBusi+"/TransIn", SagaBusi+"/TransInCompensate", req)
logrus.Printf("busi trans commit")
Add(Busi+"/TransOut", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInRevert", req)
logrus.Printf("saga busi trans commit")
err := saga.Commit()
logrus.Printf("result gid is: %s", saga.Gid)
e2p(err)
}
// api
func SagaAddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(sagaTransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(sagaTransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(SagaTransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(sagaTransOutCompensate))
logrus.Printf("examples listening at %d", SagaBusiPort)
}
var SagaTransInResult = ""
var SagaTransOutResult = ""
var SagaTransInCompensateResult = ""
var SagaTransOutCompensateResult = ""
func sagaTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaTransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func SagaTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransOutResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaTransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(SagaTransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}

View File

@ -0,0 +1,101 @@
package examples
import (
"database/sql"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
"gorm.io/gorm"
)
// 事务参与者的服务地址
const SagaBarrierBusiApi = "/api/busi_saga_barrier"
var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi)
func SagaBarrierMain() {
go SagaBarrierStartSvr()
SagaBarrierFireRequest()
time.Sleep(1000 * time.Second)
}
func SagaBarrierStartSvr() {
logrus.Printf("saga barrier examples starting")
app := common.GetGinApp()
SagaBarrierAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort))
}
func SagaBarrierFireRequest() {
logrus.Printf("a busi transaction begin")
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
saga := dtm.SagaNew(DtmServer).
Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req).
Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req)
logrus.Printf("busi trans commit")
err := saga.Commit()
e2p(err)
}
// api
func SagaBarrierAddRoute(app *gin.Engine) {
app.POST(SagaBarrierBusiApi+"/TransIn", common.WrapHandler(sagaBarrierTransIn))
app.POST(SagaBarrierBusiApi+"/TransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate))
app.POST(SagaBarrierBusiApi+"/TransOut", common.WrapHandler(sagaBarrierTransOut))
app.POST(SagaBarrierBusiApi+"/TransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate))
logrus.Printf("examples listening at %d", SagaBarrierBusiPort)
}
var SagaBarrierTransInResult = ""
var SagaBarrierTransOutResult = ""
var SagaBarrierTransInCompensateResult = ""
var SagaBarrierTransOutCompensateResult = ""
func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := reqFrom(c)
res := common.OrString(SagaBarrierTransInResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransIn: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := reqFrom(c)
res := common.OrString(SagaBarrierTransInCompensateResult, "SUCCESS")
logrus.Printf("%s TransInCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
lid := c.Query("lid")
req := reqFrom(c)
return dtm.ThroughBarrierCall(dbGet().ToSqlDB(), "saga", gid, lid, "action", func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", c.Query("user_id")).
Update("balance", gorm.Expr("balance - ?", req.Amount))
return nil, dbr.Error
})
// res := common.OrString(SagaBarrierTransOutResult, req.TransOutResult, "SUCCESS")
// logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
// return M{"result": res}, nil
}
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := reqFrom(c)
res := common.OrString(SagaBarrierTransOutCompensateResult, "SUCCESS")
logrus.Printf("%s TransOutCompensate: %v result: %s", gid, req, res)
return M{"result": res}, nil
}

View File

@ -1,111 +1,37 @@
package examples
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
// 事务参与者的服务地址
const TccBusiApi = "/api/busi_tcc"
var TccBusi = fmt.Sprintf("http://localhost:%d%s", TccBusiPort, TccBusiApi)
func TccMain() {
go TccStartSvr()
app := BaseAppNew()
BaseAppSetup(app)
TccSetup(app)
go BaseAppStart(app)
time.Sleep(100 * time.Millisecond)
TccFireRequest()
time.Sleep(1000 * time.Second)
}
func TccStartSvr() {
logrus.Printf("tcc examples starting")
app := common.GetGinApp()
TccAddRoute(app)
app.Run(fmt.Sprintf(":%d", TccBusiPort))
func TccSetup(app *gin.Engine) {
}
func TccFireRequest() {
logrus.Printf("a busi transaction begin")
logrus.Printf("tcc transaction begin")
req := &TransReq{
Amount: 30,
TransInResult: "SUCCESS",
TransOutResult: "SUCCESS",
}
tcc := dtm.TccNew(DtmServer).
Add(TccBusi+"/TransOutTry", TccBusi+"/TransOutConfirm", TccBusi+"/TransOutCancel", req).
Add(TccBusi+"/TransInTry", TccBusi+"/TransInConfirm", TccBusi+"/TransOutCancel", req)
logrus.Printf("busi trans commit")
Add(Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert", req).
Add(Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransOutRevert", req)
logrus.Printf("tcc trans commit")
err := tcc.Commit()
e2p(err)
}
// api
func TccAddRoute(app *gin.Engine) {
app.POST(TccBusiApi+"/TransInTry", common.WrapHandler(tccTransInTry))
app.POST(TccBusiApi+"/TransInConfirm", common.WrapHandler(tccTransInConfirm))
app.POST(TccBusiApi+"/TransInCancel", common.WrapHandler(tccTransCancel))
app.POST(TccBusiApi+"/TransOutTry", common.WrapHandler(tccTransOutTry))
app.POST(TccBusiApi+"/TransOutConfirm", common.WrapHandler(tccTransOutConfirm))
app.POST(TccBusiApi+"/TransOutCancel", common.WrapHandler(tccTransOutCancel))
logrus.Printf("examples listening at %d", TccBusiPort)
}
var TccTransInTryResult = ""
var TccTransOutTryResult = ""
var TccTransInCancelResult = ""
var TccTransOutCancelResult = ""
var TccTransInConfirmResult = ""
var TccTransOutConfirmResult = ""
func tccTransInTry(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInTryResult, req.TransInResult, "SUCCESS")
logrus.Printf("%s TransInTry: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransInConfirm(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInConfirmResult, "SUCCESS")
logrus.Printf("%s tccTransInConfirm: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransCancel(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransInCancelResult, "SUCCESS")
logrus.Printf("%s tccTransCancel: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutTry(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutTryResult, req.TransOutResult, "SUCCESS")
logrus.Printf("%s TransOut: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutConfirm(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutConfirmResult, "SUCCESS")
logrus.Printf("%s TransOutConfirm: %v result: %s", gid, req, res)
return M{"result": res}, nil
}
func tccTransOutCancel(c *gin.Context) (interface{}, error) {
gid := c.Query("gid")
req := transReqFromContext(c)
res := common.OrString(TccTransOutCancelResult, "SUCCESS")
logrus.Printf("%s tccTransOutCancel: %v result: %s", gid, req, res)
return M{"result": res}, nil
}

View File

@ -5,17 +5,11 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
"gorm.io/gorm"
)
// 事务参与者的服务地址
const XaBusiApi = "/api/busi_xa"
var XaBusi = fmt.Sprintf("http://localhost:%d%s", XaBusiPort, XaBusiApi)
var XaClient *dtm.XaClient = nil
type UserAccount struct {
@ -31,22 +25,14 @@ func dbGet() *common.DB {
}
func XaMain() {
go XaStartSvr()
app := BaseAppNew()
XaSetup(app)
go BaseAppStart(app)
time.Sleep(100 * time.Millisecond)
XaFireRequest()
time.Sleep(1000 * time.Second)
}
func XaStartSvr() {
common.InitApp(common.GetProjectDir(), &Config)
Config.Mysql["database"] = "dtm_busi"
logrus.Printf("xa examples starting")
app := common.GetGinApp()
XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, XaBusi+"/xa")
XaAddRoute(app)
app.Run(fmt.Sprintf(":%d", XaBusiPort))
}
func XaFireRequest() {
gid := common.GenGid()
err := XaClient.XaGlobalTransaction(gid, func() (rerr error) {
@ -55,12 +41,12 @@ func XaFireRequest() {
resp, err := common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "1",
}).Post(XaBusi + "/TransOut")
}).Post(Busi + "/TransOutXa")
common.CheckRestySuccess(resp, err)
resp, err = common.RestyClient.R().SetBody(req).SetQueryParams(map[string]string{
"gid": gid,
"user_id": "2",
}).Post(XaBusi + "/TransIn")
}).Post(Busi + "/TransInXa")
common.CheckRestySuccess(resp, err)
return nil
})
@ -68,14 +54,16 @@ func XaFireRequest() {
}
// api
func XaAddRoute(app *gin.Engine) {
app.POST(XaBusiApi+"/TransIn", common.WrapHandler(xaTransIn))
app.POST(XaBusiApi+"/TransOut", common.WrapHandler(xaTransOut))
func XaSetup(app *gin.Engine) {
app.POST(BusiApi+"/TransInXa", common.WrapHandler(xaTransIn))
app.POST(BusiApi+"/TransOutXa", common.WrapHandler(xaTransOut))
Config.Mysql["database"] = "dtm_busi"
XaClient = dtm.XaClientNew(DtmServer, Config.Mysql, app, Busi+"/xa")
}
func xaTransIn(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) {
req := transReqFromContext(c)
req := reqFrom(c)
if req.TransInResult != "SUCCESS" {
return fmt.Errorf("tranIn failed")
}
@ -89,7 +77,7 @@ func xaTransIn(c *gin.Context) (interface{}, error) {
func xaTransOut(c *gin.Context) (interface{}, error) {
err := XaClient.XaLocalTransaction(c.Query("gid"), func(db *common.DB) (rerr error) {
req := transReqFromContext(c)
req := reqFrom(c)
if req.TransOutResult != "SUCCESS" {
return fmt.Errorf("tranOut failed")
}

View File

@ -11,52 +11,45 @@ import (
)
// 事务参与者的服务地址
const startBusiApi = "/api/busi_start"
const qsBusiApi = "/api/busi_start"
const qsBusiPort = 8082
var startBusi = fmt.Sprintf("http://localhost:%d%s", startBusiPort, startBusiApi)
var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiApi)
func startMain() {
go startStartSvr()
startFireRequest()
func StartMain() {
go qsStartSvr()
qsFireRequest()
time.Sleep(1000 * time.Second)
}
func startStartSvr() {
logrus.Printf("saga examples starting")
func qsStartSvr() {
logrus.Printf("quick start examples starting")
app := common.GetGinApp()
startAddRoute(app)
app.Run(fmt.Sprintf(":%d", SagaBusiPort))
qsAddRoute(app)
app.Run(fmt.Sprintf(":%d", qsBusiPort))
}
func startFireRequest() {
func qsFireRequest() {
req := &gin.H{"amount": 30}
saga := dtm.SagaNew(DtmServer).
Add(startBusi+"/TransOut", startBusi+"/TransOutCompensate", req).
Add(startBusi+"/TransIn", startBusi+"/TransInCompensate", req)
Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
err := saga.Commit()
e2p(err)
}
func startAddRoute(app *gin.Engine) {
app.POST(SagaBusiApi+"/TransIn", common.WrapHandler(startTransIn))
app.POST(SagaBusiApi+"/TransInCompensate", common.WrapHandler(startTransInCompensate))
app.POST(SagaBusiApi+"/TransOut", common.WrapHandler(startTransOut))
app.POST(SagaBusiApi+"/TransOutCompensate", common.WrapHandler(startTransOutCompensate))
logrus.Printf("examples listening at %d", startBusiPort)
}
func startTransIn(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}
func startTransInCompensate(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}
func startTransOut(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}
func startTransOutCompensate(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
func qsAddRoute(app *gin.Engine) {
app.POST(qsBusiApi+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}))
app.POST(qsBusiApi+"/TransInCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}))
app.POST(qsBusiApi+"/TransOut", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}))
app.POST(qsBusiApi+"/TransOutCompensate", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"result": "SUCCESS"}, nil
}))
logrus.Printf("quick qs examples listening at %d", qsBusiPort)
}

View File

@ -1,7 +1,10 @@
package examples
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/yedf/dtm"
"github.com/yedf/dtm/common"
)
@ -13,13 +16,7 @@ type M = map[string]interface{}
const DtmServer = "http://localhost:8080/api/dtmsvr"
const (
MsgBusiPort = iota + 8081
SagaBusiPort
SagaBarrierBusiPort
TccBusiPort
TccBarrierBusiPort
XaBusiPort
startBusiPort
SagaBarrierBusiPort = iota + 8090
)
type TransReq struct {
@ -28,6 +25,10 @@ type TransReq struct {
TransOutResult string `json:"transOutResult"`
}
func (t *TransReq) String() string {
return fmt.Sprintf("amount: %d transIn: %s transOut: %s", t.Amount, t.TransInResult, t.TransOutResult)
}
func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
return &TransReq{
Amount: amount,
@ -36,9 +37,19 @@ func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
}
}
func transReqFromContext(c *gin.Context) *TransReq {
func reqFrom(c *gin.Context) *TransReq {
req := TransReq{}
err := c.BindJSON(&req)
e2p(err)
return &req
}
func infoFromContext(c *gin.Context) *dtm.TransInfo {
info := dtm.TransInfo{
TransType: c.Query("trans_type"),
Gid: c.Query("gid"),
BranchID: c.Query("branch_id"),
BranchType: c.Query("branch_type"),
}
return &info
}

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/json-iterator/go v1.1.10
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gorm.io/driver/mysql v1.0.3

134
intra-barrier.md Normal file
View File

@ -0,0 +1,134 @@
# 子事务屏障--彻底解决子事务乱序执行问题
### 子事务乱序问题
我们拿分布式事务中的TCC作为例子看看如何子事务乱序是什么样的如何解决。
假设一个分布式TCC事务G包含子事务A和B全部成功为
A-Try ok -> B-Try ok -> G完成
当网络出现问题时,下面是某一种常见情形
A-Try 网络丢包导致超时 -> A Cancel ok丢包 -> A Try 请求到达子事务进行处理此时因为Cancel已成功需要忽略立即返回)->A Cancel ok
这种情况下:
A Cancel在Try之前执行称为空补偿问题此时Cancel的处理需要进行空补偿称为空补偿控制
A 第二次Cancel称为幂等问题此时Cancel的处理需要判断此前已回滚此次忽略成为幂等控制
A 第二次Try时之前已进行了Cancel此次之行应当忽略我们称之为防悬挂控制
这几种情况的正确处理,通常需要子事务做精细处理,例如记录业务处理的主键,并保证上述描述的逻辑。
子事务屏障提供了一套全新方法,让业务编写者完全不用被上述问题困扰,他的工作方式如下:
子事务进入屏障在屏障中编写自己的业务逻辑由屏障保证内部Try-Confirm-Cancel逻辑只被提交一次
子事务屏障的原理如下:
它使用表sub_trans_barrier主键为全局事务id-子事务id-子事务分支名称try|confirm|cancel
。开启事务
。如果是Try分支则那么insert ignore插入gid-branchid-try如果成功插入则调用屏障内逻辑
. 如果是Confirm分支那么insert ignore插入gid-branchid-confirm如果成功插入则调用屏障内逻辑
. 如果是Cancel分支那么insert ignore插入gid-branchid-try再插入gid-branchid-cancel如果try未插入并且cancel插入成功则调用屏障内逻辑
屏障内如果调用成功,提交事务,返回成功
如果调用异常,回滚事务,返回异常
在此机制下,解决了乱序问题
空补偿控制--如果Try没有执行直接执行了Cancel那么Cancel插入gid-branchid-try会成功不走屏障内的逻辑保证了空补偿控制
幂等控制--任何一个分支都无法重复插入唯一键,保证了不会重复执行
防悬挂控制--Try在Cancel之后执行那么插入的gid-branchid-try不成功就不执行保证了防悬挂控制
通过子事务屏障,完全解决了子事务乱序问题,业务人员可以只关心自己的业务逻辑
某些业务要求,一系列操作必须全部执行,而不能仅执行一部分。例如,一个转账操作:
```
-- 从id=1的账户给id=2的账户转账100元
-- 第一步将id=1的A账户余额减去100
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 第二步将id=2的B账户余额加上100
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
```
这两条SQL语句必须全部执行或者由于某些原因如果第一条语句成功第二条语句失败就必须全部撤销。
这种把多条语句作为一个整体进行操作的功能被称为数据库事务。数据库事务可以确保该事务范围内的所有操作都可以全部成功或者全部失败。如果事务失败那么效果就和没有执行这些SQL一样不会对数据库数据有任何改动。
[更多事务介绍](https://www.liaoxuefeng.com/wiki/1177760294764384/1179611198786848)
### 微服务
如果一个事务涉及的所有操作能够放在一个服务内部,那么使用各门语言里事务相关的库,可以轻松的实现多个操作作为整体的事务操作。
但是有些服务,例如生成订单涉及做很多操作,包括库存、优惠券、赠送、账户余额等。当系统复杂程度增加时,想要把所有这些操作放到一个服务内实现,会导致耦合度太高,维护成本非常高。
针对复杂的系统,当前流行的微服务架构是非常好的解决方案,该架构能够把复杂系统进行拆分,拆分后形成了大量微服务,独立开发,独立维护。
[更多微服务介绍](https://www.zhihu.com/question/65502802)
虽然服务拆分了,但是订单本身的逻辑需要多个操作作为一个整体,要么全部成功,要么全部失败,这就带来了新的挑战。如何把散落在各个微服务中的本地事务,组成一个大的事务,保证他们作为一个整体,这就是分布式事务需要解决的问题。
### 分布式事务
分布式事务简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
[更多分布式事务介绍](https://juejin.cn/post/6844903647197806605)
分布式事务方案包括:
* xa
* tcc
* saga
* 可靠消息
下面我们看看最简单的xa
### XA
XA一共分为两阶段
第一阶段prepare事务管理器向所有本地资源管理器发起请求询问是否是 ready 状态,所有参与者都将本事务能否成功的信息反馈发给协调者;
第二阶段 (commit/rollback):事务管理器根据所有本地资源管理器的反馈,通知所有本地资源管理器,步调一致地在所有分支上提交或者回滚。
目前主流的数据库基本都支持XA事务包括mysql、oracle、sqlserver、postgre
### xa实践
介绍了这么多我们来实践完成一个微服务上的xa事务加深分布式事务的理解这里将采用[dtm](https://github.com/yedf/dtm.git)作为示例
[安装go](https://golang.org/doc/install)
[安装mysql](https://www.mysql.com/cn/)
获取dtm
```
git clone https://github.com/yedf/dtm.git
cd dtm
```
配置mysql
```
cp conf.sample.yml conf.yml
vi conf.yml
```
运行示例
```
go run app/main.go xa
```
从日志里,能够找到以下输出
```
# 服务1输出
XA start '4fPqCNTYeSG'
UPDATE `user_account` SET `balance`=balance + 30,`update_time`='2021-06-09 11:50:42.438' WHERE user_id = '1'
XA end '4fPqCNTYeSG'
XA prepare '4fPqCNTYeSG'
# 服务2输出
XA start '4fPqCPijxyC'
UPDATE `user_account` SET `balance`=balance - 30,`update_time`='2021-06-09 11:50:42.493' WHERE user_id = '2'
XA end '4fPqCPijxyC'
XA prepare '4fPqCPijxyC'
# 服务1输出
xa commit '4fPqCNTYeSG'
#服务2输出
xa commit '4fPqCPijxyC'
```
### 总结
在这篇简短的文章里,我们大致介绍了 事务->分布式事务->微服务处理XA事务。有兴趣的同学可以通过[dtm](https://github.com/yedf/dtm)继续研究分布式事务