dtm/examples/main_saga_barrier.go
2021-07-08 22:21:34 +08:00

93 lines
3.0 KiB
Go

package examples
import (
"database/sql"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm"
)
// 事务参与者的服务地址
const SagaBarrierBusiApi = "/api/busi_saga_barrier"
var SagaBarrierBusi = fmt.Sprintf("http://localhost:%d%s", SagaBarrierBusiPort, SagaBarrierBusiApi)
func SagaBarrierMainStart() {
SagaBarrierStartSvr()
SagaBarrierFireRequest()
}
func SagaBarrierStartSvr() {
logrus.Printf("saga barrier examples starting")
app := common.GetGinApp()
SagaBarrierAddRoute(app)
go app.Run(fmt.Sprintf(":%d", SagaBarrierBusiPort))
time.Sleep(100 * time.Millisecond)
}
func SagaBarrierFireRequest() {
logrus.Printf("a busi transaction begin")
req := &TransReq{Amount: 30}
saga := dtmcli.NewSaga(DtmServer).
Add(SagaBarrierBusi+"/TransOut", SagaBarrierBusi+"/TransOutCompensate", req).
Add(SagaBarrierBusi+"/TransIn", SagaBarrierBusi+"/TransInCompensate", req)
logrus.Printf("busi trans submit")
err := saga.Submit()
e2p(err)
}
// api
func SagaBarrierAddRoute(app *gin.Engine) {
app.POST(SagaBarrierBusiApi+"/TransIn", common.WrapHandler(sagaBarrierTransIn))
app.POST(SagaBarrierBusiApi+"/TransInCompensate", common.WrapHandler(sagaBarrierTransInCompensate))
app.POST(SagaBarrierBusiApi+"/TransOut", common.WrapHandler(sagaBarrierTransOut))
app.POST(SagaBarrierBusiApi+"/TransOutCompensate", common.WrapHandler(sagaBarrierTransOutCompensate))
logrus.Printf("examples listening at %d", SagaBarrierBusiPort)
}
func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).
Update("balance", gorm.Expr("balance + ?", req.Amount))
return "SUCCESS", dbr.Error
})
}
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 1).
Update("balance", gorm.Expr("balance - ?", req.Amount))
return "SUCCESS", dbr.Error
})
}
func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2).
Update("balance", gorm.Expr("balance - ?", req.Amount))
return "SUCCESS", dbr.Error
})
}
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
return dtmcli.ThroughBarrierCall(dbGet().ToSqlDB(), dtmcli.TransInfoFromReq(c), func(sdb *sql.DB) (interface{}, error) {
db := common.SqlDB2DB(sdb)
dbr := db.Model(&UserAccount{}).Where("user_id = ?", 2).
Update("balance", gorm.Expr("balance + ?", req.Amount))
return "SUCCESS", dbr.Error
})
}