gid generated from dtmsvr
This commit is contained in:
parent
cee73457c4
commit
b34701f851
@ -2,12 +2,10 @@ package common
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
@ -16,7 +14,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -51,30 +48,6 @@ func PanicIf(cond bool, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func getOneHexIp() string {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
fmt.Printf("cannot get ip, default to another call")
|
||||
return gNode.Generate().Base58()
|
||||
}
|
||||
for _, address := range addrs {
|
||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||
if ipnet.IP.To4() != nil {
|
||||
ip := ipnet.IP.To4().String()
|
||||
ns := strings.Split(ip, ".")
|
||||
r := []byte{}
|
||||
for _, n := range ns {
|
||||
r = append(r, byte(MustAtoi(n)))
|
||||
}
|
||||
return hex.EncodeToString(r)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
fmt.Printf("none ipv4, default to another call")
|
||||
return gNode.Generate().Base58()
|
||||
}
|
||||
|
||||
// MustAtoi 走must逻辑
|
||||
func MustAtoi(s string) int {
|
||||
r, err := strconv.Atoi(s)
|
||||
@ -84,18 +57,6 @@ func MustAtoi(s string) int {
|
||||
return r
|
||||
}
|
||||
|
||||
var gNode *snowflake.Node = nil
|
||||
|
||||
func init() {
|
||||
node, err := snowflake.NewNode(1)
|
||||
E2P(err)
|
||||
gNode = node
|
||||
}
|
||||
|
||||
func GenGid() string {
|
||||
return getOneHexIp() + "-" + gNode.Generate().Base58()
|
||||
}
|
||||
|
||||
func OrString(ss ...string) string {
|
||||
for _, s := range ss {
|
||||
if s != "" {
|
||||
|
||||
@ -36,12 +36,6 @@ func TestEP(t *testing.T) {
|
||||
}()
|
||||
}
|
||||
|
||||
func TestGid(t *testing.T) {
|
||||
id1 := GenGid()
|
||||
id2 := GenGid()
|
||||
assert.NotEqual(t, id1, id2)
|
||||
}
|
||||
|
||||
func TestTernary(t *testing.T) {
|
||||
assert.Equal(t, "1", OrString("", "", "1"))
|
||||
assert.Equal(t, "", OrString("", "", ""))
|
||||
|
||||
@ -27,7 +27,7 @@ type MsgStep struct {
|
||||
func NewMsg(server string) *Msg {
|
||||
return &Msg{
|
||||
MsgData: MsgData{
|
||||
Gid: common.GenGid(),
|
||||
Gid: GenGid(server),
|
||||
TransType: "msg",
|
||||
},
|
||||
Server: server,
|
||||
|
||||
@ -16,7 +16,7 @@ type Tcc struct {
|
||||
type TccGlobalFunc func(tcc *Tcc) error
|
||||
|
||||
func TccGlobalTransaction(dtm string, tccFunc TccGlobalFunc) (gid string, rerr error) {
|
||||
gid = common.GenGid()
|
||||
gid = GenGid(dtm)
|
||||
data := &M{
|
||||
"gid": gid,
|
||||
"trans_type": "tcc",
|
||||
@ -49,7 +49,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) {
|
||||
}
|
||||
|
||||
func (t *Tcc) CallBranch(body interface{}, tryUrl string, confirmUrl string, cancelUrl string) (*resty.Response, error) {
|
||||
branchID := common.GenGid()
|
||||
branchID := GenGid(t.Dtm)
|
||||
resp, err := common.RestyClient.R().
|
||||
SetBody(&M{
|
||||
"gid": t.Gid,
|
||||
|
||||
@ -57,7 +57,7 @@ func NewXa(server string, mysqlConf map[string]string, app *gin.Engine, callback
|
||||
|
||||
func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error) {
|
||||
defer common.P2E(&rerr)
|
||||
branchID := common.GenGid()
|
||||
branchID := GenGid(xa.Server)
|
||||
tx, my := common.DbAlone(xa.Conf)
|
||||
defer func() { my.Close() }()
|
||||
tx.Must().Exec(fmt.Sprintf("XA start '%s'", branchID))
|
||||
@ -76,7 +76,7 @@ func (xa *Xa) XaLocalTransaction(gid string, transFunc XaLocalFunc) (rerr error)
|
||||
}
|
||||
|
||||
func (xa *Xa) XaGlobalTransaction(transFunc XaGlobalFunc) (gid string, rerr error) {
|
||||
gid = common.GenGid()
|
||||
gid = GenGid(xa.Server)
|
||||
data := &M{
|
||||
"gid": gid,
|
||||
"trans_type": "xa",
|
||||
|
||||
@ -17,6 +17,11 @@ func AddRoute(engine *gin.Engine) {
|
||||
engine.POST("/api/dtmsvr/registerTccBranch", common.WrapHandler(RegisterTccBranch))
|
||||
engine.POST("/api/dtmsvr/abort", common.WrapHandler(Abort))
|
||||
engine.GET("/api/dtmsvr/query", common.WrapHandler(Query))
|
||||
engine.GET("/api/dtmsvr/newGid", common.WrapHandler(NewGid))
|
||||
}
|
||||
|
||||
func NewGid(c *gin.Context) (interface{}, error) {
|
||||
return M{"gid": GenGid()}, nil
|
||||
}
|
||||
|
||||
func Prepare(c *gin.Context) (interface{}, error) {
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/yedf/dtm/common"
|
||||
)
|
||||
|
||||
func CronPrepared() {
|
||||
@ -39,7 +38,7 @@ func CronCommitted() {
|
||||
|
||||
func lockOneTrans(expireIn time.Duration, status string) *TransGlobal {
|
||||
trans := TransGlobal{}
|
||||
owner := common.GenGid()
|
||||
owner := GenGid()
|
||||
db := dbGet()
|
||||
dbr := db.Must().Model(&trans).
|
||||
Where("next_cron_time < date_add(now(), interval ? second) and status=?", int(expireIn/time.Second), status).
|
||||
|
||||
@ -135,7 +135,7 @@ func (t *TransGlobal) setNextCron(expireIn int64) []string {
|
||||
|
||||
func (t *TransGlobal) SaveNew(db *common.DB) {
|
||||
if t.Gid == "" {
|
||||
t.Gid = common.GenGid()
|
||||
t.Gid = GenGid()
|
||||
}
|
||||
err := db.Transaction(func(db1 *gorm.DB) error {
|
||||
db := &common.DB{DB: db1}
|
||||
|
||||
@ -22,7 +22,7 @@ func (t *TransMsgProcessor) GenBranches() []TransBranch {
|
||||
for _, step := range steps {
|
||||
branches = append(branches, TransBranch{
|
||||
Gid: t.Gid,
|
||||
BranchID: common.GenGid(),
|
||||
BranchID: GenGid(),
|
||||
Data: step["data"].(string),
|
||||
Url: step["action"].(string),
|
||||
BranchType: "action",
|
||||
|
||||
@ -20,7 +20,7 @@ func (t *TransSagaProcessor) GenBranches() []TransBranch {
|
||||
steps := []M{}
|
||||
common.MustUnmarshalString(t.Data, &steps)
|
||||
for _, step := range steps {
|
||||
branch := common.GenGid()
|
||||
branch := GenGid()
|
||||
for _, branchType := range []string{"compensate", "action"} {
|
||||
branches = append(branches, TransBranch{
|
||||
Gid: t.Gid,
|
||||
|
||||
@ -1,6 +1,12 @@
|
||||
package dtmsvr
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/yedf/dtm/common"
|
||||
)
|
||||
@ -39,3 +45,39 @@ func WaitTransProcessed(gid string) {
|
||||
}
|
||||
logrus.Printf("finish for gid %s", gid)
|
||||
}
|
||||
|
||||
var gNode *snowflake.Node = nil
|
||||
|
||||
func init() {
|
||||
node, err := snowflake.NewNode(1)
|
||||
e2p(err)
|
||||
gNode = node
|
||||
}
|
||||
|
||||
func GenGid() string {
|
||||
return getOneHexIp() + "-" + gNode.Generate().Base58()
|
||||
}
|
||||
|
||||
func getOneHexIp() string {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
fmt.Printf("cannot get ip, default to another call")
|
||||
return gNode.Generate().Base58()
|
||||
}
|
||||
for _, address := range addrs {
|
||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||
if ipnet.IP.To4() != nil {
|
||||
ip := ipnet.IP.To4().String()
|
||||
ns := strings.Split(ip, ".")
|
||||
r := []byte{}
|
||||
for _, n := range ns {
|
||||
r = append(r, byte(common.MustAtoi(n)))
|
||||
}
|
||||
return hex.EncodeToString(r)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
fmt.Printf("none ipv4, default to another call")
|
||||
return gNode.Generate().Base58()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user