diff --git a/README.md b/README.md
index 55f5efe..b9f7467 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,7 @@ DTM是首款golang的开源分布式事务管理器,优雅的解决了幂等
## [教程与文档](https://dtm.pub)
-## [各语言客户端及示例](./doc/sdk.md)
+## [各语言客户端及示例](https://dtm.pub/summary/code.html#go)
## 快速开始
diff --git a/doc/barrier-en.md b/doc/barrier-en.md
deleted file mode 100644
index 1dcfe6a..0000000
--- a/doc/barrier-en.md
+++ /dev/null
@@ -1,92 +0,0 @@
-## abnormal situations
-For a distributed system, because the state of each node is uncontrollable, downtime may occur; the network is uncontrollable, and network failure may occur. Therefore, all distributed systems need to deal with the above-mentioned failure conditions. For example, a failed operation needs to be retried to ensure that the final operation is completed.
-
-When a service receives a retry request, the request may be the first normal request, or it may be a repeated request (after the previous request is processed, the returned response has a network problem). In a distributed system, the business needs to successfully process repeated requests, which is called idempotent.
-
-Distributed transactions not only involve the above-mentioned distributed problems, but also involve transactions, and the situation is more complicated. Let's take a look at the three common problems.
-
-## dangling compensation, idempotent, dangling action
-
-The following describes these abnormal situations with TCC transactions:
-
-- Dangling compensation: The second-stage Cancel method is called without calling the Try method of the TCC resource. The Cancel method needs to recognize that this is an empty rollback, and then directly returns success.
-- Idempotence: Since any request may have network abnormalities and repeated requests, all distributed transaction branches need to be idempotent.
-- Dangling action: For a distributed transaction, when the Try method is executed, the second phase Cancel interface has been executed before. The Try method needs to recognize that this is a dangling action and return directly to failure.
-
-Let’s take a look at a sequence diagram of network abnormalities to better understand the above-mentioned problems.
-
-
-
-- When business processing step 4, Cancel is executed before Try, and empty rollback needs to be processed.
-- When business processing step 6, Cancel is executed repeatedly and needs to be idempotent.
-- When business processing step 8, Try is executed after Cancel and needs to be processed.
-
-For the above-mentioned complex network abnormalities, all distributed transaction systems currently recommend that business developers use unique keys to query whether the associated operations have been completed. If it is completed, it returns directly to success. The related logic is complex, error-prone, and the business burden is heavy.
-
-## Sub-transaction barrier technology
-
-We propose a seed transaction barrier technology, using this technology, can achieve this effect, see the schematic diagram:
-
-
-
-All these requests, after reaching the sub-transaction barrier: abnormal requests will be filtered; normal requests will pass the barrier. After the developer uses the sub-transaction barrier, the various exceptions mentioned above are all properly handled, and the business developer only needs to pay attention to the actual business logic, and the burden is greatly reduced.
-
-The external interface of the sub-transaction barrier is very simple. It only provides a method ThroughBarrierCall. The prototype of the method (the Go language is currently completed, and other languages are under development):
-
-`func ThroughBarrierCall(db *sql.DB, transInfo *TransInfo, busiCall BusiFunc)`
-
-Business developers write their own related logic in busiCall and call this function. ThroughBarrierCall guarantees that busiCall will not be called in scenarios such as dangling compensation, repeated request, dangling action, ensuring that the actual business processing logic is only submitted once.
-
-Sub-transaction barrier will manage TCC, SAGA, transaction messages, etc., and can also be extended to other areas
-
-
-## Principle of Subtransaction Barrier
-
-The principle of the sub-transaction barrier technology is to create a branch transaction status table sub_trans_barrier in the local database. The unique key is the global transaction id-sub-transaction id-sub-transaction branch type (try|confirm|cancel). The table creation statement is as follows:
-
-``` SQL
-CREATE TABLE `barrier` (
- `id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
- `trans_type` varchar(45) DEFAULT'',
- `gid` varchar(128) DEFAULT'',
- `branch_id` varchar(128) DEFAULT'',
- `branch_type` varchar(45) DEFAULT'',
- `create_time` datetime DEFAULT now(),
- `update_time` datetime DEFAULT now(),
- UNIQUE KEY `gid` (`gid`,`branch_id`,`branch_type`)
-);
-```
-
-The internal steps of ThroughBarrierCall are as follows:
-
-- Open transaction
-- If it is a Try branch, then insert ignore is inserted into gid-branchid-try, if it is successfully inserted, then the logic in the barrier is called
-- If it is a Confirm branch, then insert ignore inserts gid-branchid-confirm, if it is successfully inserted, call the logic in the barrier
-- If it is a Cancel branch, insert ignore into gid-branchid-try, and then insert gid-branchid-cancel, if the try is not inserted and cancel is inserted successfully, the logic in the barrier is called
-- The logic within the barrier returns success, commits the transaction, and returns success
-- The logic inside the barrier returns an error, rolls back the transaction, and returns an error
-
-Under this mechanism, problems related to network abnormalities are solved
-
-- Dangling compensation control: If Try is not executed and Cancel is executed directly, then Cancel will be inserted into gid-branchid-try successfully, and the logic inside the barrier will not be followed, ensuring empty compensation control
-- Idempotent control: No single key can be inserted repeatedly in any branch, which ensures that it will not be executed repeatedly
-- Dangling action control: Try is to be executed after Cancel, then the inserted gid-branchid-try is not successful, it will not be executed
-
-Let's take a look at the example in http_tcc_barrier.go in dtm:
-
-``` GO
-func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
- req := reqFrom(c) // 去重构一下,改成可以重复使用的输入
- barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
- return adjustTrading(db, transInUID, req.Amount)
- })
-}
-```
-
-The Try in the TransIn business only needs one barrier.Call call to handle the above abnormal situation, which greatly simplifies the work of business developers. For SAGA transactions, reliable messages, etc., a similar mechanism can also be used.
-
-## summary
-The sub-transaction barrier technology proposed in this project systematically solves the problem of network disorder in distributed transactions and greatly reduces the difficulty of sub-transaction disorder processing.
-
-Other development languages can also quickly access the technology
\ No newline at end of file
diff --git a/doc/protocol-en.md b/doc/protocol-en.md
deleted file mode 100644
index e2d2fc6..0000000
--- a/doc/protocol-en.md
+++ /dev/null
@@ -1,38 +0,0 @@
-## DTM Communication Protocol
-
-### Role
-A dtm transaction has three roles:
-
-- RM-Resource Manager: Manage system resources. The database is a resource manager that handles management resources and should also have the ability to manage transaction commit and rollback.
- * RM manages sub-transactions in distributed transactions, and is responsible for operations such as modification, submission, rollback, and compensation of related data. Usually corresponds to a microservice.
-- TM-Transaction Manager: The transaction manager is the core manager of distributed transactions. The transaction manager communicates with each RM (Resource Manager) to coordinate and complete transaction processing.
- * Every global transaction is registered in TM, and every sub-transaction is also registered in TM. TM will coordinate all RMs, commit all different sub-transactions of the same global transaction, or roll back all of them.
-- AP-Application Program: The application program calls the RM interface according to the business rules to complete the changes to the business model data.
- * The AP will register the global transaction, register the sub-transactions according to the business rules, and call the RM interface. Usually corresponds to a microservice.
-
-In the case of nested sub-transactions, a microservice will play the roles of RM and AP at the same time, as shown in the figure
-
-
-
-### Protocol
-
-Currently dtm only supports the http protocol. Since distributed transactions involve the collaboration of multiple roles, some participants may be temporarily unavailable and need to be retried; some participants clearly inform that they have failed and need to be rolled back.
-
-The following is a classification description of each situation, and the return value of each situation is defined. The design mainly draws on the interface of WeChat/Alipay order success callback. They also return SUCCESS to indicate success and do not retry.
-
-In the above figure, there are mainly the following types of interfaces:
-
-AP calls the TM interface, mainly for global transaction registration, submission, and sub-transaction registration, etc.:
- - Success: { dtm_result: "SUCCESS" }
- - Failure: { dtm_result: "FAILURE" }, indicates that the status of this request is incorrect, for example, a failed global transaction is not allowed to register branches.
- - Other errors need to be tried again.
-
-TM calls the RM interface, mainly for the two-phase commit, rollback, and each branch of saga
- - Success: { dtm_result: "SUCCESS" }, indicates that this interface is successfully called, and proceed to the next step normally.
- - Failure: { dtm_result: "FAILURE" }, indicates that this interface call failed and the business needs to be rolled back. For example, if the action in saga returns FAILURE, the entire saga transaction fails and rolls back.
- - Others need to retry (The result is uncertain, need to retry).
-
-AP calls the RM interface, which is related to the business, and the recommended interface form (not required):
- - Success: { dtm_result: "SUCCESS" }, indicates that this interface is successfully called, and proceed to the next step normally. The returned result can also contain other business data.
- - Failure: { dtm_result: "FAILURE" }, indicates that this interface call failed and the business needs to be rolled back. For example, if the Try action in tcc returns FAILURE, the entire tcc transaction fails and rolls back.
- - Others need to retry (The result is uncertain, need to retry).
diff --git a/doc/protocol.md b/doc/protocol.md
deleted file mode 100644
index 58bb5d3..0000000
--- a/doc/protocol.md
+++ /dev/null
@@ -1,37 +0,0 @@
-## dtm通信协议
-
-### 角色
-一个dtm事务,有三个角色参与:
-
-- RM-资源管理器:管理系统资源。数据库就是一种资源管理器,资源管理还应该具有管理事务提交和回滚的能力。
- * RM管理分布式事务中的子事务,负责相关数据的修改、提交、回滚、补偿等操作。通常对应一个微服务。
-- TM-事务管理器:事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器RM进行通信,协调并完成事务的处理。
- * 每个全局事务在TM注册,每个子事务也注册到TM。TM会协调所有的RM,将同一个全局事务的不同子事务,全部提交或全部回滚。
-- AP-应用程序:应用程序,按照业务规则调用RM接口来完成对业务模型数据的变更。
- * AP会注册全局事务,按照业务规则,注册子事务,调用RM接口。通常对应一个微服务。
-
-在子事务嵌套的情况下,一个微服务,同时会扮演RM和AP的角色,如图
-
-
-
-### 协议
-
-目前dtm只支持了http协议。由于分布式事务涉及多个角色协作,某些参与者可能出现暂时不可用,需要重试;某些参与者明确告知失败,需要进行回滚。
-下面对各种情况进行分类说明,定义各类情况的返回值。设计主要借鉴了微信/支付宝订单成功回调的接口,他们也是通过返回SUCCESS来表示成功,不再进行重试。
-
-上面的图中,主要有以下几类接口:
-
-AP调用TM的接口,主要为全局事务注册、提交,子事务注册等:
- - 成功: { dtm_result: "SUCCESS" }
- - 失败: { dtm_result: "FAILURE" },表示这个请求状态不对,例如已经走fail的全局事务不允许再注册分支
- - 其他错误则需要重试
-
-TM调用RM的接口,主要为二阶段的提交、回滚,以及saga的各分支
- - 成功: { dtm_result: "SUCCESS" },表示这个接口调用成功,正常进行下一步操作
- - 失败: { dtm_result: "FAILURE" },表示这个接口调用失败,业务需要进行回滚。例如saga中的动作如果返回FAILURE,则整个saga事务失败回滚
- - 其他则需要重试(结果不确定,需要重试)
-
-AP调用RM的接口,跟业务相关,建议的接口形式(非必须):
- - 成功: { dtm_result: "SUCCESS" },表示这个接口调用成功,正常进行下一步操作。返回的结果还可以包含其他业务数据。
- - 失败: { dtm_result: "FAILURE" },表示这个接口调用失败,业务需要进行回滚。例如tcc中的Try动作如果返回FAILURE,则整个tcc事务失败回滚
- - 其他则需要重试(结果不确定,需要重试)
diff --git a/doc/sdk.md b/doc/sdk.md
deleted file mode 100644
index d39cc76..0000000
--- a/doc/sdk.md
+++ /dev/null
@@ -1,27 +0,0 @@
-## 各语言客户端及示例
-
-### go
-客户端sdk: [https://github.com/yedf/dtm](https://github.com/yedf/dtm)
-
-示例: [https://github.com/yedf/dtmcli-go-sample](https://github.com/yedf/dtmcli-go-sample)
-
-### python
-
-客户端sdk(当前只支持TCC): [https://github.com/yedf/dtmcli-py](https://github.com/yedf/dtmcli-py)
-
-示例: [https://github.com/yedf/dtmcli-py-sample](https://github.com/yedf/dtmcli-py-sample)
-
-### php
-
-客户端sdk(当前只支持TCC): [https://github.com/yedf/dtmcli-php](https://github.com/yedf/dtmcli-php)
-
-示例: [https://github.com/yedf/dtmcli-php-sample](https://github.com/yedf/dtmcli-php-sample)
-
-感谢 [onlyshow](https://github.com/onlyshow) 的帮助,php的sdk和示例,全部由[onlyshow](https://github.com/onlyshow)独立完成
-
-### node
-
-客户端sdk(当前只支持TCC): [https://github.com/yedf/dtmcli-node](https://github.com/yedf/dtmcli-node)
-
-示例: [https://github.com/yedf/dtmcli-node-sample](https://github.com/yedf/dtmcli-node-sample)
-
diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go
index fd35fa5..f04caa6 100644
--- a/dtmcli/barrier.go
+++ b/dtmcli/barrier.go
@@ -70,13 +70,13 @@ func (bb *BranchBarrier) Call(tx Tx, busiCall BusiFunc) (rerr error) {
}()
ti := bb
originType := map[string]string{
- "cancel": "try",
- "compensate": "action",
+ BranchCancel: BranchTry,
+ BranchCompensate: BranchAction,
}[ti.BranchType]
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.BranchType)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.BranchType, bid, ti.BranchType)
Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
- if (ti.BranchType == "cancel" || ti.BranchType == "compensate") && originAffected > 0 { // 这个是空补偿,返回成功
+ if (ti.BranchType == BranchCancel || ti.BranchType == BranchCompensate) && originAffected > 0 { // 这个是空补偿,返回成功
return
} else if currentAffected == 0 { // 插入不成功
var result sql.NullString
diff --git a/dtmcli/consts.go b/dtmcli/consts.go
new file mode 100644
index 0000000..bbb3455
--- /dev/null
+++ b/dtmcli/consts.go
@@ -0,0 +1,32 @@
+package dtmcli
+
+const (
+ // StatusPrepared status for global trans status. exists only in tran message
+ StatusPrepared = "prepared"
+ // StatusSubmitted StatusSubmitted status for global trans status.
+ StatusSubmitted = "submitted"
+ // StatusSucceed status for global trans status.
+ StatusSucceed = "succeed"
+ // StatusFailed status for global trans status.
+ StatusFailed = "failed"
+
+ // BranchTry branch type for TCC
+ BranchTry = "try"
+ // BranchConfirm branch type for TCC
+ BranchConfirm = "confirm"
+ // BranchCancel branch type for TCC
+ BranchCancel = "cancel"
+ // BranchAction branch type for message, SAGA, XA
+ BranchAction = "action"
+ // BranchCompensate branch type for SAGA
+ BranchCompensate = "compensate"
+ // BranchCommit branch type for XA
+ BranchCommit = "commit"
+ // BranchRollback branch type for XA
+ BranchRollback = "rollback"
+
+ // ResultSuccess for result of a trans/trans branch
+ ResultSuccess = "SUCCESS"
+ // ResultFailure for result of a trans/trans branch
+ ResultFailure = "FAILURE"
+)
diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go
index d1a2009..013d3c0 100644
--- a/dtmcli/tcc.go
+++ b/dtmcli/tcc.go
@@ -56,13 +56,13 @@ func TccFromQuery(qs url.Values) (*Tcc, error) {
func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, cancelURL string) (*resty.Response, error) {
branchID := t.NewBranchID()
err := t.callDtm(&M{
- "gid": t.Gid,
- "branch_id": branchID,
- "trans_type": "tcc",
- "data": string(MustMarshal(body)),
- "try": tryURL,
- "confirm": confirmURL,
- "cancel": cancelURL,
+ "gid": t.Gid,
+ "branch_id": branchID,
+ "trans_type": "tcc",
+ "data": string(MustMarshal(body)),
+ BranchTry: tryURL,
+ BranchConfirm: confirmURL,
+ "cancel": cancelURL,
}, "registerTccBranch")
if err != nil {
return nil, err
@@ -74,7 +74,7 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
"gid": t.Gid,
"branch_id": branchID,
"trans_type": "tcc",
- "branch_type": "try",
+ "branch_type": BranchTry,
}).
Post(tryURL)
return resp, CheckResponse(resp, err)
diff --git a/dtmcli/types.go b/dtmcli/types.go
index b9be198..e21994b 100644
--- a/dtmcli/types.go
+++ b/dtmcli/types.go
@@ -88,7 +88,7 @@ func (tb *TransBase) callDtm(body interface{}, operation string) error {
return err
}
tr := resp.Result().(*TransResult)
- if tr.DtmResult == "FAILURE" {
+ if tr.DtmResult == ResultFailure {
return errors.New("FAILURE: " + tr.Message)
}
return nil
@@ -100,8 +100,8 @@ var ErrFailure = errors.New("transaction FAILURE")
// ErrPending 表示暂时失败,要求重试
var ErrPending = errors.New("transaction PENDING")
-// ResultSuccess 表示返回成功,可以进行下一步
-var ResultSuccess = M{"dtm_result": "SUCCESS"}
+// MapSuccess 表示返回成功,可以进行下一步
+var MapSuccess = M{"dtm_result": ResultSuccess}
-// ResultFailure 表示返回失败,要求回滚
-var ResultFailure = M{"dtm_result": "FAILURE"}
+// MapFailure 表示返回失败,要求回滚
+var MapFailure = M{"dtm_result": ResultFailure}
diff --git a/dtmcli/utils.go b/dtmcli/utils.go
index ffd3d48..38f9ebc 100644
--- a/dtmcli/utils.go
+++ b/dtmcli/utils.go
@@ -239,7 +239,7 @@ func CheckResponse(resp *resty.Response, err error) error {
if err == nil && resp != nil {
if resp.IsError() {
return errors.New(resp.String())
- } else if strings.Contains(resp.String(), "FAILURE") {
+ } else if strings.Contains(resp.String(), ResultFailure) {
return ErrFailure
}
}
@@ -254,7 +254,7 @@ func CheckResult(res interface{}, err error) error {
}
if res != nil {
str := MustMarshalString(res)
- if strings.Contains(str, "FAILURE") {
+ if strings.Contains(str, ResultFailure) {
return ErrFailure
} else if strings.Contains(str, "PENDING") {
return ErrPending
diff --git a/dtmcli/xa.go b/dtmcli/xa.go
index d4fc837..ebdbabd 100644
--- a/dtmcli/xa.go
+++ b/dtmcli/xa.go
@@ -53,7 +53,7 @@ func NewXaClient(server string, mysqlConf map[string]string, notifyURL string, r
// HandleCallback 处理commit/rollback的回调
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) {
- return ResultSuccess, xc.XaClientBase.HandleCallback(gid, branchID, action)
+ return MapSuccess, xc.XaClientBase.HandleCallback(gid, branchID, action)
}
// XaLocalTransaction start a xa local transaction
@@ -95,7 +95,7 @@ func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
"gid": x.Gid,
"branch_id": branchID,
"trans_type": "xa",
- "branch_type": "action",
+ "branch_type": BranchAction,
}).
Post(url)
return resp, CheckResponse(resp, err)
diff --git a/dtmsvr/api.go b/dtmsvr/api.go
index fd36ba8..7d22b07 100644
--- a/dtmsvr/api.go
+++ b/dtmsvr/api.go
@@ -10,26 +10,26 @@ import (
func svcSubmit(t *TransGlobal, waitResult bool) (interface{}, error) {
db := dbGet()
dbt := TransFromDb(db, t.Gid)
- if dbt != nil && dbt.Status != "prepared" && dbt.Status != "submitted" {
- return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status %s, cannot sumbmit", dbt.Status)}, nil
+ if dbt != nil && dbt.Status != dtmcli.StatusPrepared && dbt.Status != dtmcli.StatusSubmitted {
+ return M{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status %s, cannot sumbmit", dbt.Status)}, nil
}
- t.Status = "submitted"
+ t.Status = dtmcli.StatusSubmitted
t.saveNew(db)
return t.Process(db, waitResult), nil
}
func svcPrepare(t *TransGlobal) (interface{}, error) {
- t.Status = "prepared"
+ t.Status = dtmcli.StatusPrepared
t.saveNew(dbGet())
- return dtmcli.ResultSuccess, nil
+ return dtmcli.MapSuccess, nil
}
func svcAbort(t *TransGlobal, waitResult bool) (interface{}, error) {
db := dbGet()
dbt := TransFromDb(db, t.Gid)
- if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != "prepared" && dbt.Status != "aborting" {
- return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
+ if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != dtmcli.StatusPrepared && dbt.Status != "aborting" {
+ return M{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: %s current status %s, cannot abort", dbt.TransType, dbt.Status)}, nil
}
return dbt.Process(db, waitResult), nil
}
@@ -37,12 +37,12 @@ func svcAbort(t *TransGlobal, waitResult bool) (interface{}, error) {
func svcRegisterTccBranch(branch *TransBranch, data dtmcli.MS) (interface{}, error) {
db := dbGet()
dbt := TransFromDb(db, branch.Gid)
- if dbt.Status != "prepared" {
- return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
+ if dbt.Status != dtmcli.StatusPrepared {
+ return M{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
}
branches := []TransBranch{*branch, *branch, *branch}
- for i, b := range []string{"cancel", "confirm", "try"} {
+ for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm, dtmcli.BranchTry} {
branches[i].BranchType = b
branches[i].URL = data[b]
}
@@ -52,23 +52,23 @@ func svcRegisterTccBranch(branch *TransBranch, data dtmcli.MS) (interface{}, err
}).Create(branches)
global := TransGlobal{Gid: branch.Gid}
global.touch(dbGet(), config.TransCronInterval)
- return dtmcli.ResultSuccess, nil
+ return dtmcli.MapSuccess, nil
}
func svcRegisterXaBranch(branch *TransBranch) (interface{}, error) {
- branch.Status = "prepared"
+ branch.Status = dtmcli.StatusPrepared
db := dbGet()
dbt := TransFromDb(db, branch.Gid)
- if dbt.Status != "prepared" {
- return M{"dtm_result": "FAILURE", "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
+ if dbt.Status != dtmcli.StatusPrepared {
+ return M{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
}
branches := []TransBranch{*branch, *branch}
- branches[0].BranchType = "rollback"
- branches[1].BranchType = "commit"
+ branches[0].BranchType = dtmcli.BranchRollback
+ branches[1].BranchType = dtmcli.BranchCommit
db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
global := TransGlobal{Gid: branch.Gid}
global.touch(db, config.TransCronInterval)
- return dtmcli.ResultSuccess, nil
+ return dtmcli.MapSuccess, nil
}
diff --git a/dtmsvr/api_grpc.go b/dtmsvr/api_grpc.go
index 2d749f9..3f392a2 100644
--- a/dtmsvr/api_grpc.go
+++ b/dtmsvr/api_grpc.go
@@ -37,12 +37,12 @@ func (s *dtmServer) RegisterTccBranch(ctx context.Context, in *pb.DtmTccBranchRe
r, err := svcRegisterTccBranch(&TransBranch{
Gid: in.Info.Gid,
BranchID: in.Info.BranchID,
- Status: "prepared",
+ Status: dtmcli.StatusPrepared,
Data: in.BusiData,
}, dtmcli.MS{
- "cancel": in.Cancel,
- "confirm": in.Confirm,
- "try": in.Try,
+ dtmcli.BranchCancel: in.Cancel,
+ dtmcli.BranchConfirm: in.Confirm,
+ dtmcli.BranchTry: in.Try,
})
return &emptypb.Empty{}, dtmgrpc.Result2Error(r, err)
}
@@ -51,7 +51,7 @@ func (s *dtmServer) RegisterXaBranch(ctx context.Context, in *pb.DtmXaBranchRequ
r, err := svcRegisterXaBranch(&TransBranch{
Gid: in.Info.Gid,
BranchID: in.Info.BranchID,
- Status: "prepared",
+ Status: dtmcli.StatusPrepared,
Data: in.BusiData,
URL: in.Notify,
})
diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go
index ff3dea5..81f207d 100644
--- a/dtmsvr/api_http.go
+++ b/dtmsvr/api_http.go
@@ -20,14 +20,14 @@ func addRoute(engine *gin.Engine) {
}
func newGid(c *gin.Context) (interface{}, error) {
- return M{"gid": GenGid(), "dtm_result": "SUCCESS"}, nil
+ return M{"gid": GenGid(), "dtm_result": dtmcli.ResultSuccess}, nil
}
func prepare(c *gin.Context) (interface{}, error) {
t := TransFromContext(c)
- t.Status = "prepared"
+ t.Status = dtmcli.StatusPrepared
t.saveNew(dbGet())
- return dtmcli.ResultSuccess, nil
+ return dtmcli.MapSuccess, nil
}
func submit(c *gin.Context) (interface{}, error) {
@@ -52,7 +52,7 @@ func registerTccBranch(c *gin.Context) (interface{}, error) {
branch := TransBranch{
Gid: data["gid"],
BranchID: data["branch_id"],
- Status: "prepared",
+ Status: dtmcli.StatusPrepared,
Data: data["data"],
}
return svcRegisterTccBranch(&branch, data)
diff --git a/dtmsvr/trans.go b/dtmsvr/trans.go
index 0f48021..29bed06 100644
--- a/dtmsvr/trans.go
+++ b/dtmsvr/trans.go
@@ -56,10 +56,10 @@ func (t *TransGlobal) changeStatus(db *common.DB, status string) *gorm.DB {
updates := t.setNextCron(config.TransCronInterval)
updates = append(updates, "status")
now := time.Now()
- if status == "succeed" {
+ if status == dtmcli.StatusSucceed {
t.FinishTime = &now
updates = append(updates, "finish_time")
- } else if status == "failed" {
+ } else if status == dtmcli.StatusFailed {
t.RollbackTime = &now
updates = append(updates, "rollback_time")
}
@@ -119,17 +119,17 @@ func (t *TransGlobal) getProcessor() transProcessor {
func (t *TransGlobal) Process(db *common.DB, waitResult bool) dtmcli.M {
if !waitResult {
go t.processInner(db)
- return dtmcli.ResultSuccess
+ return dtmcli.MapSuccess
}
- submitting := t.Status == "submitted"
+ submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
- return dtmcli.M{"dtm_result": "FAILURE", "message": err.Error()}
+ return dtmcli.M{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
- if submitting && t.Status != "succeed" {
- return dtmcli.M{"dtm_result": "FAILURE", "message": "trans failed by user"}
+ if submitting && t.Status != dtmcli.StatusSucceed {
+ return dtmcli.M{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
- return dtmcli.ResultSuccess
+ return dtmcli.MapSuccess
}
func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
@@ -142,7 +142,7 @@ func (t *TransGlobal) processInner(db *common.DB) (rerr error) {
}
}()
dtmcli.Logf("processing: %s status: %s", t.Gid, t.Status)
- if t.Status == "prepared" && t.TransType != "msg" {
+ if t.Status == dtmcli.StatusPrepared && t.TransType != "msg" {
t.changeStatus(db, "aborting")
}
branches := []TransBranch{}
@@ -173,9 +173,9 @@ func (t *TransGlobal) getURLResult(url string, branchID, branchType string, bran
BusiData: branchData,
}, &emptypb.Empty{})
if err == nil {
- return "SUCCESS"
+ return dtmcli.ResultSuccess
} else if status.Code(err) == codes.Aborted {
- return "FAILURE"
+ return dtmcli.ResultFailure
}
return err.Error()
}
@@ -199,12 +199,12 @@ func (t *TransGlobal) getBranchResult(branch *TransBranch) string {
func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) {
body := t.getBranchResult(branch)
- if strings.Contains(body, "SUCCESS") {
+ if strings.Contains(body, dtmcli.ResultSuccess) {
t.touch(db, config.TransCronInterval)
- branch.changeStatus(db, "succeed")
- } else if t.TransType == "saga" && branch.BranchType == "action" && strings.Contains(body, "FAILURE") {
+ branch.changeStatus(db, dtmcli.StatusSucceed)
+ } else if t.TransType == "saga" && branch.BranchType == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
t.touch(db, config.TransCronInterval)
- branch.changeStatus(db, "failed")
+ branch.changeStatus(db, dtmcli.StatusFailed)
} else {
panic(fmt.Errorf("http result should contains SUCCESS|FAILURE. grpc error should return nil|Aborted. \nrefer to: https://dtm.pub/summary/arch.html#http\nunkown result will be retried: %s", body))
}
@@ -226,8 +226,8 @@ func (t *TransGlobal) saveNew(db *common.DB) {
DoNothing: true,
}).Create(&branches)
}
- } else if dbr.RowsAffected == 0 && t.Status == "submitted" { // 如果数据库已经存放了prepared的事务,则修改状态
- dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, "prepared").Select(append(updates, "status")).Updates(t)
+ } else if dbr.RowsAffected == 0 && t.Status == dtmcli.StatusSubmitted { // 如果数据库已经存放了prepared的事务,则修改状态
+ dbr = db.Must().Model(t).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t)
}
return nil
})
diff --git a/dtmsvr/trans_msg.go b/dtmsvr/trans_msg.go
index b020d1a..a1f7d4c 100644
--- a/dtmsvr/trans_msg.go
+++ b/dtmsvr/trans_msg.go
@@ -24,21 +24,21 @@ func (t *transMsgProcessor) GenBranches() []TransBranch {
Gid: t.Gid,
BranchID: GenGid(),
Data: step["data"].(string),
- URL: step["action"].(string),
- BranchType: "action",
- Status: "prepared",
+ URL: step[dtmcli.BranchAction].(string),
+ BranchType: dtmcli.BranchAction,
+ Status: dtmcli.StatusPrepared,
})
}
return branches
}
func (t *TransGlobal) mayQueryPrepared(db *common.DB) {
- if t.Status != "prepared" {
+ if t.Status != dtmcli.StatusPrepared {
return
}
body := t.getURLResult(t.QueryPrepared, "", "", nil)
- if strings.Contains(body, "SUCCESS") {
- t.changeStatus(db, "submitted")
+ if strings.Contains(body, dtmcli.ResultSuccess) {
+ t.changeStatus(db, dtmcli.StatusSubmitted)
} else {
t.touch(db, t.NextCronInterval*2)
}
@@ -46,22 +46,22 @@ func (t *TransGlobal) mayQueryPrepared(db *common.DB) {
func (t *transMsgProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
t.mayQueryPrepared(db)
- if t.Status != "submitted" {
+ if t.Status != dtmcli.StatusSubmitted {
return
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
- if branch.BranchType != "action" || branch.Status != "prepared" {
+ if branch.BranchType != dtmcli.BranchAction || branch.Status != dtmcli.StatusPrepared {
continue
}
t.execBranch(db, branch)
- if branch.Status != "succeed" {
+ if branch.Status != dtmcli.StatusSucceed {
break
}
}
if current == len(branches) { // msg 事务完成
- t.changeStatus(db, "succeed")
+ t.changeStatus(db, dtmcli.StatusSucceed)
return
}
panic("msg go pass all branch")
diff --git a/dtmsvr/trans_saga.go b/dtmsvr/trans_saga.go
index 27260ce..a8c27a5 100644
--- a/dtmsvr/trans_saga.go
+++ b/dtmsvr/trans_saga.go
@@ -21,14 +21,14 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
dtmcli.MustUnmarshalString(t.Data, &steps)
for i, step := range steps {
branch := fmt.Sprintf("%02d", i+1)
- for _, branchType := range []string{"compensate", "action"} {
+ for _, branchType := range []string{dtmcli.BranchCompensate, dtmcli.BranchAction} {
branches = append(branches, TransBranch{
Gid: t.Gid,
BranchID: branch,
Data: step["data"].(string),
URL: step[branchType].(string),
BranchType: branchType,
- Status: "prepared",
+ Status: dtmcli.StatusPrepared,
})
}
}
@@ -36,36 +36,36 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
}
func (t *transSagaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
- if t.Status == "failed" || t.Status == "succeed" {
+ if t.Status == dtmcli.StatusFailed || t.Status == dtmcli.StatusSucceed {
return
}
current := 0 // 当前正在处理的步骤
for ; current < len(branches); current++ {
branch := &branches[current]
- if branch.BranchType != "action" || branch.Status == "succeed" {
+ if branch.BranchType != dtmcli.BranchAction || branch.Status == dtmcli.StatusSucceed {
continue
}
// 找到了一个非succeed的action
- if branch.Status == "prepared" {
+ if branch.Status == dtmcli.StatusPrepared {
t.execBranch(db, branch)
}
- if branch.Status != "succeed" {
+ if branch.Status != dtmcli.StatusSucceed {
break
}
}
if current == len(branches) { // saga 事务完成
- t.changeStatus(db, "succeed")
+ t.changeStatus(db, dtmcli.StatusSucceed)
return
}
- if t.Status != "aborting" && t.Status != "failed" {
+ if t.Status != "aborting" && t.Status != dtmcli.StatusFailed {
t.changeStatus(db, "aborting")
}
for current = current - 1; current >= 0; current-- {
branch := &branches[current]
- if branch.BranchType != "compensate" || branch.Status != "prepared" {
+ if branch.BranchType != dtmcli.BranchCompensate || branch.Status != dtmcli.StatusPrepared {
continue
}
t.execBranch(db, branch)
}
- t.changeStatus(db, "failed")
+ t.changeStatus(db, dtmcli.StatusFailed)
}
diff --git a/dtmsvr/trans_tcc.go b/dtmsvr/trans_tcc.go
index 024247d..43f564a 100644
--- a/dtmsvr/trans_tcc.go
+++ b/dtmsvr/trans_tcc.go
@@ -18,14 +18,14 @@ func (t *transTccProcessor) GenBranches() []TransBranch {
}
func (t *transTccProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
- if t.Status == "succeed" || t.Status == "failed" {
+ if t.Status == dtmcli.StatusSucceed || t.Status == dtmcli.StatusFailed {
return
}
- branchType := dtmcli.If(t.Status == "submitted", "confirm", "cancel").(string)
+ branchType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchConfirm, dtmcli.BranchCancel).(string)
for current := len(branches) - 1; current >= 0; current-- {
- if branches[current].BranchType == branchType && branches[current].Status == "prepared" {
+ if branches[current].BranchType == branchType && branches[current].Status == dtmcli.StatusPrepared {
t.execBranch(db, &branches[current])
}
}
- t.changeStatus(db, dtmcli.If(t.Status == "submitted", "succeed", "failed").(string))
+ t.changeStatus(db, dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))
}
diff --git a/dtmsvr/trans_xa.go b/dtmsvr/trans_xa.go
index 543c39c..e4a79a4 100644
--- a/dtmsvr/trans_xa.go
+++ b/dtmsvr/trans_xa.go
@@ -18,14 +18,14 @@ func (t *transXaProcessor) GenBranches() []TransBranch {
}
func (t *transXaProcessor) ProcessOnce(db *common.DB, branches []TransBranch) {
- if t.Status == "succeed" {
+ if t.Status == dtmcli.StatusSucceed {
return
}
- currentType := dtmcli.If(t.Status == "submitted", "commit", "rollback").(string)
+ currentType := dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.BranchCommit, dtmcli.BranchRollback).(string)
for _, branch := range branches {
- if branch.BranchType == currentType && branch.Status != "succeed" {
+ if branch.BranchType == currentType && branch.Status != dtmcli.StatusSucceed {
t.execBranch(db, &branch)
}
}
- t.changeStatus(db, dtmcli.If(t.Status == "submitted", "succeed", "failed").(string))
+ t.changeStatus(db, dtmcli.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))
}
diff --git a/dtmsvr/utils.go b/dtmsvr/utils.go
index c56915e..e59c59d 100644
--- a/dtmsvr/utils.go
+++ b/dtmsvr/utils.go
@@ -29,7 +29,7 @@ func writeTransLog(gid string, action string, status string, branch string, deta
// }
// dbGet().Must().Table("trans_log").Create(M{
// "gid": gid,
- // "action": action,
+ // dtmcli.BranchAction: action,
// "new_status": status,
// "branch": branch,
// "detail": detail,
diff --git a/examples/base_grpc.go b/examples/base_grpc.go
index 9d991f9..39f1e4c 100644
--- a/examples/base_grpc.go
+++ b/examples/base_grpc.go
@@ -40,11 +40,11 @@ func GrpcStartup() {
}
func handleGrpcBusiness(in *dtmgrpc.BusiRequest, result1 string, result2 string, busi string) error {
- res := dtmcli.OrString(result1, result2, "SUCCESS")
+ res := dtmcli.OrString(result1, result2, dtmcli.ResultSuccess)
dtmcli.Logf("grpc busi %s %s result: %s", busi, in.Info, res)
- if res == "SUCCESS" {
+ if res == dtmcli.ResultSuccess {
return nil
- } else if res == "FAILURE" {
+ } else if res == dtmcli.ResultFailure {
return status.New(codes.Aborted, "user want to rollback").Err()
}
return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err()
@@ -112,7 +112,7 @@ func (s *busiServer) TransInXa(ctx context.Context, in *dtmgrpc.BusiRequest) (*d
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &dtmgrpc.BusiReply{BusiData: []byte("reply")}, XaGrpcClient.XaLocalTransaction(in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error {
- if req.TransInResult == "FAILURE" {
+ if req.TransInResult == dtmcli.ResultFailure {
return status.New(codes.Aborted, "user return failure").Err()
}
_, err := dtmcli.DBExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", req.Amount, 2)
@@ -124,7 +124,7 @@ func (s *busiServer) TransOutXa(ctx context.Context, in *dtmgrpc.BusiRequest) (*
req := TransReq{}
dtmcli.MustUnmarshal(in.BusiData, &req)
return &dtmgrpc.BusiReply{BusiData: []byte("reply")}, XaGrpcClient.XaLocalTransaction(in, func(db *sql.DB, xa *dtmgrpc.XaGrpc) error {
- if req.TransOutResult == "FAILURE" {
+ if req.TransOutResult == dtmcli.ResultFailure {
return status.New(codes.Aborted, "user return failure").Err()
}
_, err := dtmcli.DBExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", req.Amount, 1)
diff --git a/examples/base_http.go b/examples/base_http.go
index 28154b0..3e50017 100644
--- a/examples/base_http.go
+++ b/examples/base_http.go
@@ -76,7 +76,7 @@ var MainSwitch mainSwitchType
func handleGeneralBusiness(c *gin.Context, result1 string, result2 string, busi string) (interface{}, error) {
info := infoFromContext(c)
- res := dtmcli.OrString(result1, result2, "SUCCESS")
+ res := dtmcli.OrString(result1, result2, dtmcli.ResultSuccess)
dtmcli.Logf("%s %s result: %s", busi, info.String(), res)
return M{"dtm_result": res}, nil
}
@@ -103,31 +103,31 @@ func BaseAddRoute(app *gin.Engine) {
}))
app.GET(BusiAPI+"/CanSubmit", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
dtmcli.Logf("%s CanSubmit", c.Query("gid"))
- return dtmcli.OrString(MainSwitch.CanSubmitResult.Fetch(), "SUCCESS"), nil
+ return dtmcli.OrString(MainSwitch.CanSubmitResult.Fetch(), dtmcli.ResultSuccess), nil
}))
app.POST(BusiAPI+"/TransInXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
- if reqFrom(c).TransInResult == "FAILURE" {
- return dtmcli.ResultFailure, nil
+ if reqFrom(c).TransInResult == dtmcli.ResultFailure {
+ return dtmcli.MapFailure, nil
}
_, err := dtmcli.DBExec(db, "update dtm_busi.user_account set balance=balance+? where user_id=?", reqFrom(c).Amount, 2)
- return dtmcli.ResultSuccess, err
+ return dtmcli.MapSuccess, err
})
}))
app.POST(BusiAPI+"/TransOutXa", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
- if reqFrom(c).TransOutResult == "FAILURE" {
- return dtmcli.ResultFailure, nil
+ if reqFrom(c).TransOutResult == dtmcli.ResultFailure {
+ return dtmcli.MapFailure, nil
}
_, err := dtmcli.DBExec(db, "update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
- return dtmcli.ResultSuccess, err
+ return dtmcli.MapSuccess, err
})
}))
app.POST(BusiAPI+"/TransOutXaGorm", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
- if reqFrom(c).TransOutResult == "FAILURE" {
- return dtmcli.ResultFailure, nil
+ if reqFrom(c).TransOutResult == dtmcli.ResultFailure {
+ return dtmcli.MapFailure, nil
}
gdb, err := gorm.Open(mysql.New(mysql.Config{
Conn: db,
@@ -136,7 +136,7 @@ func BaseAddRoute(app *gin.Engine) {
return nil, err
}
dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
- return dtmcli.ResultSuccess, dbr.Error
+ return dtmcli.MapSuccess, dbr.Error
})
}))
diff --git a/examples/base_types.go b/examples/base_types.go
index cefc688..21afe4d 100644
--- a/examples/base_types.go
+++ b/examples/base_types.go
@@ -34,8 +34,8 @@ func (t *TransReq) String() string {
func GenTransReq(amount int, outFailed bool, inFailed bool) *TransReq {
return &TransReq{
Amount: amount,
- TransOutResult: dtmcli.If(outFailed, "FAILURE", "SUCCESS").(string),
- TransInResult: dtmcli.If(inFailed, "FAILURE", "SUCCESS").(string),
+ TransOutResult: dtmcli.If(outFailed, dtmcli.ResultFailure, dtmcli.ResultSuccess).(string),
+ TransInResult: dtmcli.If(inFailed, dtmcli.ResultFailure, dtmcli.ResultSuccess).(string),
}
}
diff --git a/examples/grpc_saga_barrier.go b/examples/grpc_saga_barrier.go
index 4a4a57e..f48bfce 100644
--- a/examples/grpc_saga_barrier.go
+++ b/examples/grpc_saga_barrier.go
@@ -23,7 +23,7 @@ func init() {
}
func sagaGrpcBarrierAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
- if result == "FAILURE" {
+ if result == dtmcli.ResultFailure {
return status.New(codes.Aborted, "user rollback").Err()
}
_, err := dtmcli.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
diff --git a/examples/http_saga_barrier.go b/examples/http_saga_barrier.go
index a6c865a..0b0291b 100644
--- a/examples/http_saga_barrier.go
+++ b/examples/http_saga_barrier.go
@@ -38,14 +38,14 @@ func sagaBarrierTransIn(c *gin.Context) (interface{}, error) {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return sagaBarrierAdjustBalance(db, 1, req.Amount)
})
}
func sagaBarrierTransInCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return sagaBarrierAdjustBalance(db, 1, -reqFrom(c).Amount)
})
}
@@ -56,14 +56,14 @@ func sagaBarrierTransOut(c *gin.Context) (interface{}, error) {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return sagaBarrierAdjustBalance(db, 2, -req.Amount)
})
}
func sagaBarrierTransOutCompensate(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return sagaBarrierAdjustBalance(db, 2, reqFrom(c).Amount)
})
}
diff --git a/examples/http_saga_gorm_barrier.go b/examples/http_saga_gorm_barrier.go
index 001ba4e..ae3605e 100644
--- a/examples/http_saga_gorm_barrier.go
+++ b/examples/http_saga_gorm_barrier.go
@@ -30,7 +30,7 @@ func sagaGormBarrierTransOut(c *gin.Context) (interface{}, error) {
req := reqFrom(c)
barrier := MustBarrierFromGin(c)
tx := dbGet().DB.Begin()
- return dtmcli.ResultSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(db dtmcli.DB) error {
return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2).Error
})
}
diff --git a/examples/http_tcc_barrier.go b/examples/http_tcc_barrier.go
index 8026f5b..818a04e 100644
--- a/examples/http_tcc_barrier.go
+++ b/examples/http_tcc_barrier.go
@@ -62,21 +62,21 @@ func tccBarrierTransInTry(c *gin.Context) (interface{}, error) {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustTrading(db, transInUID, req.Amount)
})
}
func tccBarrierTransInConfirm(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustBalance(db, transInUID, reqFrom(c).Amount)
})
}
func tccBarrierTransInCancel(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustTrading(db, transInUID, -reqFrom(c).Amount)
})
}
@@ -87,14 +87,14 @@ func tccBarrierTransOutTry(c *gin.Context) (interface{}, error) {
return req.TransInResult, nil
}
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustTrading(db, transOutUID, -req.Amount)
})
}
func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustBalance(db, transOutUID, -reqFrom(c).Amount)
})
}
@@ -102,7 +102,7 @@ func tccBarrierTransOutConfirm(c *gin.Context) (interface{}, error) {
// TccBarrierTransOutCancel will be use in test
func TccBarrierTransOutCancel(c *gin.Context) (interface{}, error) {
barrier := MustBarrierFromGin(c)
- return dtmcli.ResultSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
+ return dtmcli.MapSuccess, barrier.Call(txGet(), func(db dtmcli.DB) error {
return adjustTrading(db, transOutUID, reqFrom(c).Amount)
})
}
diff --git a/examples/quick_start.go b/examples/quick_start.go
index 5a9a76a..5610e2d 100644
--- a/examples/quick_start.go
+++ b/examples/quick_start.go
@@ -43,7 +43,7 @@ func QsFireRequest() string {
func qsAdjustBalance(uid int, amount int) (interface{}, error) {
_, err := dtmcli.DBExec(sdbGet(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
- return dtmcli.ResultSuccess, err
+ return dtmcli.MapSuccess, err
}
func qsAddRoute(app *gin.Engine) {
diff --git a/go.mod b/go.mod
index 6b851d2..b001692 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/stretchr/testify v1.7.0
+ github.com/yedf/dtmcli v1.0.0
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67 // indirect
diff --git a/go.sum b/go.sum
index cb3955f..83c4fb0 100644
--- a/go.sum
+++ b/go.sum
@@ -107,6 +107,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
+github.com/yedf/dtmcli v1.0.0 h1:doGNh8NWiO+9Upq0PEpYamQlLyGWFU0IoBnQWE0ZxkQ=
+github.com/yedf/dtmcli v1.0.0/go.mod h1:hjjL7IHcTSwkq1vHyPKUbUXi1vbLFp1ZkkDtSy2JZ7o=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -129,6 +131,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -223,5 +226,3 @@ gorm.io/gorm v1.21.12 h1:3fQM0Eiz7jcJEhPggHEpoYnsGZqynMzverL77DV40RM=
gorm.io/gorm v1.21.12/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-
-
diff --git a/test/barrier_saga_test.go b/test/barrier_saga_test.go
index ab78739..c26eefa 100644
--- a/test/barrier_saga_test.go
+++ b/test/barrier_saga_test.go
@@ -23,16 +23,16 @@ func sagaBarrierNormal(t *testing.T) {
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
}
func sagaBarrierRollback(t *testing.T) {
saga := dtmcli.NewSaga(DtmServer, "sagaBarrierRollback").
Add(Busi+"/SagaBTransOut", Busi+"/SagaBTransOutCompensate", &examples.TransReq{Amount: 30}).
- Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
+ Add(Busi+"/SagaBTransIn", Busi+"/SagaBTransInCompensate", &examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure})
dtmcli.Logf("busi trans submit")
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, "failed", getTransStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
}
diff --git a/test/barrier_tcc_test.go b/test/barrier_tcc_test.go
index 036657c..a684c1a 100644
--- a/test/barrier_tcc_test.go
+++ b/test/barrier_tcc_test.go
@@ -26,11 +26,11 @@ func tccBarrierRollback(t *testing.T) {
err := dtmcli.TccGlobalTransaction(DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, err := tcc.CallBranch(&examples.TransReq{Amount: 30}, Busi+"/TccBTransOutTry", Busi+"/TccBTransOutConfirm", Busi+"/TccBTransOutCancel")
assert.Nil(t, err)
- return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
+ return tcc.CallBranch(&examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure}, Busi+"/TccBTransInTry", Busi+"/TccBTransInConfirm", Busi+"/TccBTransInCancel")
})
assert.Error(t, err)
WaitTransProcessed(gid)
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
func tccBarrierNormal(t *testing.T) {
@@ -42,7 +42,7 @@ func tccBarrierNormal(t *testing.T) {
})
assert.Nil(t, err)
WaitTransProcessed(gid)
- assert.Equal(t, "succeed", getTransStatus(gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(gid))
}
func tccBarrierDisorder(t *testing.T) {
@@ -70,18 +70,18 @@ func tccBarrierDisorder(t *testing.T) {
// 注册子事务
resp, err := dtmcli.RestyClient.R().
SetResult(&dtmcli.TransResult{}).SetBody(M{
- "gid": tcc.Gid,
- "branch_id": branchID,
- "trans_type": "tcc",
- "status": "prepared",
- "data": string(dtmcli.MustMarshal(body)),
- "try": tryURL,
- "confirm": confirmURL,
- "cancel": cancelURL,
+ "gid": tcc.Gid,
+ "branch_id": branchID,
+ "trans_type": "tcc",
+ "status": dtmcli.StatusPrepared,
+ "data": string(dtmcli.MustMarshal(body)),
+ dtmcli.BranchTry: tryURL,
+ dtmcli.BranchConfirm: confirmURL,
+ dtmcli.BranchCancel: cancelURL,
}).Post(fmt.Sprintf("%s/%s", tcc.Dtm, "registerTccBranch"))
assert.Nil(t, err)
tr := resp.Result().(*dtmcli.TransResult)
- assert.Equal(t, "SUCCESS", tr.DtmResult)
+ assert.Equal(t, dtmcli.ResultSuccess, tr.DtmResult)
go func() {
dtmcli.Logf("sleeping to wait for tcc try timeout")
@@ -93,10 +93,10 @@ func tccBarrierDisorder(t *testing.T) {
"gid": tcc.Gid,
"branch_id": branchID,
"trans_type": "tcc",
- "branch_type": "try",
+ "branch_type": dtmcli.BranchTry,
}).
Post(tryURL)
- assert.True(t, strings.Contains(r.String(), "FAILURE"))
+ assert.True(t, strings.Contains(r.String(), dtmcli.ResultFailure))
finishedChan <- "1"
}()
dtmcli.Logf("cron to timeout and then call cancel")
@@ -112,6 +112,6 @@ func tccBarrierDisorder(t *testing.T) {
return nil, fmt.Errorf("a cancelled tcc")
})
assert.Error(t, err, fmt.Errorf("a cancelled tcc"))
- assert.Equal(t, []string{"succeed", "prepared", "prepared"}, getBranchesStatus(gid))
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go
index 703d1e0..3577cc8 100644
--- a/test/dtmsvr_test.go
+++ b/test/dtmsvr_test.go
@@ -73,7 +73,7 @@ func getBranchesStatus(gid string) []string {
func assertSucceed(t *testing.T, gid string) {
WaitTransProcessed(gid)
- assert.Equal(t, "succeed", getTransStatus(gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(gid))
}
func genMsg(gid string) *dtmcli.Msg {
@@ -114,7 +114,7 @@ func TestSqlDB(t *testing.T) {
TransType: "saga",
Gid: "gid2",
BranchID: "branch_id2",
- BranchType: "action",
+ BranchType: dtmcli.BranchAction,
}
db.Must().Exec("insert ignore into dtm_barrier.barrier(trans_type, gid, branch_id, branch_type, reason) values('saga', 'gid1', 'branch_id1', 'action', 'saga')")
tx, err := db.ToSQLDB().Begin()
diff --git a/test/grpc_barrier_saga_test.go b/test/grpc_barrier_saga_test.go
index a37a759..ed09959 100644
--- a/test/grpc_barrier_saga_test.go
+++ b/test/grpc_barrier_saga_test.go
@@ -23,17 +23,17 @@ func grpcSagaBarrierNormal(t *testing.T) {
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
}
func grpcSagaBarrierRollback(t *testing.T) {
- req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
+ req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure})
saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, "grpcSagaBarrierRollback").
Add(examples.BusiGrpc+"/examples.Busi/TransOutBSaga", examples.BusiGrpc+"/examples.Busi/TransOutRevertBSaga", req).
Add(examples.BusiGrpc+"/examples.Busi/TransInBSaga", examples.BusiGrpc+"/examples.Busi/TransInRevertBSaga", req)
err := saga.Submit()
e2p(err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, "failed", getTransStatus(saga.Gid))
- assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
}
diff --git a/test/grpc_msg_test.go b/test/grpc_msg_test.go
index 0f09349..eb9e97a 100644
--- a/test/grpc_msg_test.go
+++ b/test/grpc_msg_test.go
@@ -21,7 +21,7 @@ func grpcMsgNormal(t *testing.T) {
err := msg.Submit()
assert.Nil(t, err)
WaitTransProcessed(msg.Gid)
- assert.Equal(t, "succeed", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
}
func grpcMsgPending(t *testing.T) {
@@ -30,12 +30,12 @@ func grpcMsgPending(t *testing.T) {
assert.Nil(t, err)
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
- assert.Equal(t, "prepared", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusPrepared, getTransStatus(msg.Gid))
examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
- assert.Equal(t, "submitted", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, "succeed", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
}
func genGrpcMsg(gid string) *dtmgrpc.MsgGrpc {
diff --git a/test/grpc_saga_test.go b/test/grpc_saga_test.go
index 968ec8b..830e208 100644
--- a/test/grpc_saga_test.go
+++ b/test/grpc_saga_test.go
@@ -20,8 +20,8 @@ func sagaGrpcNormal(t *testing.T) {
saga := genSagaGrpc("gid-sagaGrpcNormal", false, false)
saga.Submit()
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
transQuery(t, saga.Gid)
}
@@ -30,10 +30,10 @@ func sagaGrpcCommittedPending(t *testing.T) {
examples.MainSwitch.TransOutResult.SetOnce("PENDING")
saga.Submit()
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}
func sagaGrpcRollback(t *testing.T) {
@@ -43,8 +43,8 @@ func sagaGrpcRollback(t *testing.T) {
WaitTransProcessed(saga.Gid)
assert.Equal(t, "aborting", getTransStatus(saga.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, "failed", getTransStatus(saga.Gid))
- assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
}
func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc {
diff --git a/test/grpc_tcc_test.go b/test/grpc_tcc_test.go
index 5162d0a..9c93f2b 100644
--- a/test/grpc_tcc_test.go
+++ b/test/grpc_tcc_test.go
@@ -50,7 +50,7 @@ func tccGrpcNested(t *testing.T) {
func tccGrpcRollback(t *testing.T) {
gid := "tccGrpcRollback"
- data := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
+ data := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure})
err := dtmgrpc.TccGlobalTransaction(examples.DtmGrpcServer, gid, func(tcc *dtmgrpc.TccGrpc) error {
_, err := tcc.CallBranch(data, examples.BusiGrpc+"/examples.Busi/TransOutTcc", examples.BusiGrpc+"/examples.Busi/TransOutConfirm", examples.BusiGrpc+"/examples.Busi/TransOutRevert")
assert.Nil(t, err)
@@ -62,5 +62,5 @@ func tccGrpcRollback(t *testing.T) {
WaitTransProcessed(gid)
assert.Equal(t, "aborting", getTransStatus(gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
diff --git a/test/grpc_xa_test.go b/test/grpc_xa_test.go
index 9ebc957..42f5295 100644
--- a/test/grpc_xa_test.go
+++ b/test/grpc_xa_test.go
@@ -55,14 +55,14 @@ func xaGrpcNormal(t *testing.T) {
})
assert.Equal(t, nil, err)
WaitTransProcessed(gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(gid))
}
func xaGrpcRollback(t *testing.T) {
xc := examples.XaGrpcClient
gid := "xaGrpcRollback"
err := xc.XaGlobalTransaction(gid, func(xa *dtmgrpc.XaGrpc) error {
- req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: "FAILURE"})
+ req := dtmcli.MustMarshal(&examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure})
_, err := xa.CallBranch(req, examples.BusiGrpc+"/examples.Busi/TransOutXa")
if err != nil {
return err
@@ -72,6 +72,6 @@ func xaGrpcRollback(t *testing.T) {
})
assert.Error(t, err)
WaitTransProcessed(gid)
- assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusPrepared}, getBranchesStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
diff --git a/test/msg_test.go b/test/msg_test.go
index d15ba7d..be83dde 100644
--- a/test/msg_test.go
+++ b/test/msg_test.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
+ "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
@@ -17,23 +18,23 @@ func TestMsg(t *testing.T) {
func msgNormal(t *testing.T) {
msg := genMsg("gid-msg-normal")
msg.Submit()
- assert.Equal(t, "submitted", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid))
WaitTransProcessed(msg.Gid)
- assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
- assert.Equal(t, "succeed", getTransStatus(msg.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
}
func msgPending(t *testing.T) {
msg := genMsg("gid-msg-normal-pending")
msg.Prepare("")
- assert.Equal(t, "prepared", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusPrepared, getTransStatus(msg.Gid))
examples.MainSwitch.CanSubmitResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
- assert.Equal(t, "prepared", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusPrepared, getTransStatus(msg.Gid))
examples.MainSwitch.TransInResult.SetOnce("PENDING")
CronTransOnce(60 * time.Second)
- assert.Equal(t, "submitted", getTransStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSubmitted, getTransStatus(msg.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, []string{"succeed", "succeed"}, getBranchesStatus(msg.Gid))
- assert.Equal(t, "succeed", getTransStatus(msg.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed}, getBranchesStatus(msg.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(msg.Gid))
}
diff --git a/test/saga_test.go b/test/saga_test.go
index 6c74b18..91c8d13 100644
--- a/test/saga_test.go
+++ b/test/saga_test.go
@@ -20,8 +20,8 @@ func sagaNormal(t *testing.T) {
saga := genSaga("gid-noramlSaga", false, false)
saga.Submit()
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
transQuery(t, saga.Gid)
}
@@ -30,10 +30,10 @@ func sagaCommittedPending(t *testing.T) {
examples.MainSwitch.TransOutResult.SetOnce("PENDING")
saga.Submit()
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}
func sagaRollback(t *testing.T) {
@@ -44,8 +44,8 @@ func sagaRollback(t *testing.T) {
WaitTransProcessed(saga.Gid)
assert.Equal(t, "aborting", getTransStatus(saga.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, "failed", getTransStatus(saga.Gid))
- assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
err = saga.Submit()
assert.Error(t, err)
}
diff --git a/test/tcc_test.go b/test/tcc_test.go
index 0e9fe91..817a7bf 100644
--- a/test/tcc_test.go
+++ b/test/tcc_test.go
@@ -29,7 +29,7 @@ func tccNormal(t *testing.T) {
func tccRollback(t *testing.T) {
gid := "tccRollback"
- data := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
+ data := &examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure}
err := dtmcli.TccGlobalTransaction(examples.DtmServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
_, rerr := tcc.CallBranch(data, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert")
assert.Nil(t, rerr)
@@ -40,5 +40,5 @@ func tccRollback(t *testing.T) {
WaitTransProcessed(gid)
assert.Equal(t, "aborting", getTransStatus(gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}
diff --git a/test/wait_saga_test.go b/test/wait_saga_test.go
index ff09bea..0b06e3c 100644
--- a/test/wait_saga_test.go
+++ b/test/wait_saga_test.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
+ "github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/examples"
)
@@ -21,8 +22,8 @@ func sagaNormalWait(t *testing.T) {
err := saga.Submit()
assert.Nil(t, err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
transQuery(t, saga.Gid)
}
@@ -33,10 +34,10 @@ func sagaCommittedPendingWait(t *testing.T) {
err := saga.Submit()
assert.Error(t, err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared, dtmcli.StatusPrepared}, getBranchesStatus(saga.Gid))
CronTransOnce(60 * time.Second)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid))
- assert.Equal(t, "succeed", getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
}
func sagaRollbackWait(t *testing.T) {
@@ -45,6 +46,6 @@ func sagaRollbackWait(t *testing.T) {
err := saga.Submit()
assert.Error(t, err)
WaitTransProcessed(saga.Gid)
- assert.Equal(t, "failed", getTransStatus(saga.Gid))
- assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(saga.Gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusSucceed, dtmcli.StatusFailed}, getBranchesStatus(saga.Gid))
}
diff --git a/test/xa_test.go b/test/xa_test.go
index 09f2c47..1b980da 100644
--- a/test/xa_test.go
+++ b/test/xa_test.go
@@ -42,7 +42,7 @@ func xaNormal(t *testing.T) {
})
assert.Equal(t, nil, err)
WaitTransProcessed(gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(gid))
}
func xaDuplicate(t *testing.T) {
@@ -60,13 +60,13 @@ func xaDuplicate(t *testing.T) {
})
assert.Equal(t, nil, err)
WaitTransProcessed(gid)
- assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(gid))
}
func xaRollback(t *testing.T) {
xc := examples.XaClient
gid := "xaRollback"
err := xc.XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
- req := &examples.TransReq{Amount: 30, TransInResult: "FAILURE"}
+ req := &examples.TransReq{Amount: 30, TransInResult: dtmcli.ResultFailure}
resp, err := xa.CallBranch(req, examples.Busi+"/TransOutXa")
if err != nil {
return resp, err
@@ -75,6 +75,6 @@ func xaRollback(t *testing.T) {
})
assert.Error(t, err)
WaitTransProcessed(gid)
- assert.Equal(t, []string{"succeed", "prepared"}, getBranchesStatus(gid))
- assert.Equal(t, "failed", getTransStatus(gid))
+ assert.Equal(t, []string{dtmcli.StatusSucceed, dtmcli.StatusPrepared}, getBranchesStatus(gid))
+ assert.Equal(t, dtmcli.StatusFailed, getTransStatus(gid))
}