diff --git a/app/main.go b/app/main.go index 1e28dd3..8a5ec67 100644 --- a/app/main.go +++ b/app/main.go @@ -24,7 +24,7 @@ func main() { wait() } // 下面都是运行示例,因此首先把服务器的数据重新准备好 - dtmsvr.PopulateMysql() + dtmsvr.PopulateMysql(true) dtmsvr.MainStart() if len(os.Args) == 1 { // 默认没有参数的情况下,准备好数据并启动dtmsvr即可 wait() @@ -37,7 +37,7 @@ func main() { } // 下面是各类的例子 - examples.PopulateMysql() + examples.PopulateMysql(true) app := examples.BaseAppStartup() if os.Args[1] == "xa" { // 启动xa示例 examples.XaSetup(app) @@ -47,7 +47,7 @@ func main() { examples.SagaFireRequest() } else if os.Args[1] == "tcc" { // 启动tcc示例 examples.TccSetup(app) - examples.TccFireRequest() + examples.TccFireRequestNested() } else if os.Args[1] == "msg" { // 启动msg示例 examples.MsgSetup(app) examples.MsgFireRequest() @@ -57,7 +57,7 @@ func main() { examples.XaSetup(app) examples.MsgSetup(app) examples.SagaFireRequest() - examples.TccFireRequest() + examples.TccFireRequestNested() examples.XaFireRequest() examples.MsgFireRequest() } else if os.Args[1] == "saga_barrier" { diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index 7f89d93..c7e1f83 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -21,6 +21,9 @@ type Tcc struct { type TccGlobalFunc func(tcc *Tcc) error // TccGlobalTransaction begin a tcc global transaction +// dtm dtm服务器地址 +// gid 全局事务id +// tccFunc tcc事务函数,里面会定义全局事务的分支 func TccGlobalTransaction(dtm string, gid string, tccFunc TccGlobalFunc) (rerr error) { data := &M{ "gid": gid, @@ -67,6 +70,7 @@ func TccFromReq(c *gin.Context) (*Tcc, error) { } // CallBranch call a tcc branch +// 函数首先注册子事务的所有分支,成功后调用try分支,返回try分支的调用结果 func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) { branchID := t.NewBranchID() resp, err := common.RestyClient.R(). diff --git a/dtmsvr/dtmsvr_test.go b/dtmsvr/dtmsvr_test.go index 284c44b..e1886b8 100644 --- a/dtmsvr/dtmsvr_test.go +++ b/dtmsvr/dtmsvr_test.go @@ -21,8 +21,8 @@ func TestMain(m *testing.M) { TransProcessedTestChan = make(chan string, 1) common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = dbName - PopulateMysql() - examples.PopulateMysql() + PopulateMysql(false) + examples.PopulateMysql(false) // 启动组件 go StartSvr() app = examples.BaseAppStartup() @@ -33,10 +33,6 @@ func TestMain(m *testing.M) { examples.TccBarrierAddRoute(app) examples.SagaBarrierAddRoute(app) - // 清理数据 - e2p(dbGet().Exec("truncate trans_global").Error) - e2p(dbGet().Exec("truncate trans_branch").Error) - e2p(dbGet().Exec("truncate trans_log").Error) examples.ResetXaData() m.Run() } diff --git a/dtmsvr/examples_test.go b/dtmsvr/examples_test.go index 53990a7..2c2cc0f 100644 --- a/dtmsvr/examples_test.go +++ b/dtmsvr/examples_test.go @@ -15,5 +15,6 @@ func TestExamples(t *testing.T) { assertSucceed(t, examples.SagaFireRequest()) assertSucceed(t, examples.TccBarrierFireRequest()) assertSucceed(t, examples.TccFireRequest()) + assertSucceed(t, examples.TccFireRequestNested()) assertSucceed(t, examples.XaFireRequest()) } diff --git a/dtmsvr/main.go b/dtmsvr/main.go index a19a61b..8bdd966 100644 --- a/dtmsvr/main.go +++ b/dtmsvr/main.go @@ -30,8 +30,8 @@ func StartSvr() { } // PopulateMysql setup mysql data -func PopulateMysql() { +func PopulateMysql(skipDrop bool) { common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = "" - examples.RunSQLScript(config.Mysql, common.GetCurrentCodeDir()+"/dtmsvr.sql") + examples.RunSQLScript(config.Mysql, common.GetCurrentCodeDir()+"/dtmsvr.sql", skipDrop) } diff --git a/examples/data.go b/examples/data.go index 738467e..72da650 100644 --- a/examples/data.go +++ b/examples/data.go @@ -9,7 +9,7 @@ import ( ) // RunSQLScript 1 -func RunSQLScript(mysql map[string]string, script string) { +func RunSQLScript(mysql map[string]string, script string, skipDrop bool) { conf := map[string]string{} common.MustRemarshal(mysql, &conf) conf["database"] = "" @@ -22,7 +22,7 @@ func RunSQLScript(mysql map[string]string, script string) { sqls := strings.Split(string(content), ";") for _, sql := range sqls { s := strings.TrimSpace(sql) - if s == "" { + if s == "" || skipDrop && strings.Contains(s, "drop") { continue } logrus.Printf("executing: '%s'", s) @@ -31,8 +31,8 @@ func RunSQLScript(mysql map[string]string, script string) { } // PopulateMysql populate example mysql data -func PopulateMysql() { +func PopulateMysql(skipDrop bool) { common.InitApp(common.GetProjectDir(), &config) config.Mysql["database"] = dbName - RunSQLScript(config.Mysql, common.GetCurrentCodeDir()+"/examples.sql") + RunSQLScript(config.Mysql, common.GetCurrentCodeDir()+"/examples.sql", skipDrop) } diff --git a/examples/main_tcc.go b/examples/main_tcc.go index fa2e310..c2c79be 100644 --- a/examples/main_tcc.go +++ b/examples/main_tcc.go @@ -21,8 +21,8 @@ func TccSetup(app *gin.Engine) { })) } -// TccFireRequest 1 -func TccFireRequest() string { +// TccFireRequestNested 1 +func TccFireRequestNested() string { logrus.Printf("tcc transaction begin") gid := dtmcli.MustGenGid(DtmServer) err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { @@ -36,3 +36,18 @@ func TccFireRequest() string { e2p(err) return gid } + +// TccFireRequest 1 +func TccFireRequest() string { + logrus.Printf("tcc simple transaction begin") + gid := dtmcli.MustGenGid(DtmServer) + err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (rerr error) { + res1, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") + e2p(rerr) + res2, rerr := tcc.CallBranch(&TransReq{Amount: 30}, Busi+"/TransIn", Busi+"/TransInConfirm", Busi+"/TransInRevert") + logrus.Printf("tcc returns: %s, %s", res1.String(), res2.String()) + return + }) + e2p(err) + return gid +}