From 69b7495a71d7c69082bb311627b997ed89f07616 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 10 Aug 2021 14:54:55 +0800 Subject: [PATCH] saga grpc seems ok --- dtmgrpc/saga.go | 48 ++++++++++++++++++++++++++++ dtmsvr/trans_grpc_saga_test.go | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 dtmgrpc/saga.go create mode 100644 dtmsvr/trans_grpc_saga_test.go diff --git a/dtmgrpc/saga.go b/dtmgrpc/saga.go new file mode 100644 index 0000000..1326627 --- /dev/null +++ b/dtmgrpc/saga.go @@ -0,0 +1,48 @@ +package dtmgrpc + +import ( + context "context" + + "github.com/yedf/dtm/dtmcli" +) + +// SagaGrpc struct of saga +type SagaGrpc struct { + dtmcli.SagaData + dtmcli.TransBase +} + +// NewSaga create a saga +func NewSaga(server string, gid string) *SagaGrpc { + return &SagaGrpc{ + SagaData: dtmcli.SagaData{TransData: dtmcli.TransData{ + Gid: gid, + TransType: "saga", + }}, + TransBase: dtmcli.TransBase{ + Dtm: server, + }, + } +} + +// Add add a saga step +func (s *SagaGrpc) Add(action string, compensate string, appData []byte) *SagaGrpc { + dtmcli.Logf("saga %s Add %s %s %v", s.SagaData.Gid, action, compensate, string(appData)) + step := dtmcli.SagaStep{ + Action: action, + Compensate: compensate, + Data: string(appData), + } + s.Steps = append(s.Steps, step) + return s +} + +// Submit submit the saga trans +func (s *SagaGrpc) Submit() error { + _, err := MustGetDtmClient(s.Dtm).Submit(context.Background(), &DtmRequest{ + Gid: s.Gid, + TransType: s.TransType, + Data: dtmcli.MustMarshalString(&s.Steps), + }) + return err +} diff --git a/dtmsvr/trans_grpc_saga_test.go b/dtmsvr/trans_grpc_saga_test.go new file mode 100644 index 0000000..e1522e7 --- /dev/null +++ b/dtmsvr/trans_grpc_saga_test.go @@ -0,0 +1,57 @@ +package dtmsvr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmgrpc" + "github.com/yedf/dtm/examples" +) + +func TestGrpcSaga(t *testing.T) { + sagaGrpcNormal(t) + sagaGrpcCommittedPending(t) + sagaGrpcRollback(t) +} + +func sagaGrpcNormal(t *testing.T) { + saga := genSagaGrpc("gid-sagaGrpcNormal", false, false) + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) + transQuery(t, saga.Gid) +} + +func sagaGrpcCommittedPending(t *testing.T) { + saga := genSagaGrpc("gid-committedPendingGrpc", false, false) + examples.MainSwitch.TransOutResult.SetOnce("PENDING") + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, []string{"prepared", "prepared", "prepared", "prepared"}, getBranchesStatus(saga.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, []string{"prepared", "succeed", "prepared", "succeed"}, getBranchesStatus(saga.Gid)) + assert.Equal(t, "succeed", getTransStatus(saga.Gid)) +} + +func sagaGrpcRollback(t *testing.T) { + saga := genSagaGrpc("gid-rollbackSaga2Grpc", false, true) + examples.MainSwitch.TransOutRevertResult.SetOnce("PENDING") + saga.Submit() + WaitTransProcessed(saga.Gid) + assert.Equal(t, "aborting", getTransStatus(saga.Gid)) + CronTransOnce(60 * time.Second) + assert.Equal(t, "failed", getTransStatus(saga.Gid)) + assert.Equal(t, []string{"succeed", "succeed", "succeed", "failed"}, getBranchesStatus(saga.Gid)) +} + +func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { + dtmcli.Logf("beginning a grpc saga test ---------------- %s", gid) + saga := dtmgrpc.NewSaga(examples.DtmGrpcServer, gid) + req := dtmcli.MustMarshal(examples.GenTransReq(30, outFailed, inFailed)) + saga.Add(examples.BusiGrpc+"/examples.Busi/TransOut", examples.BusiGrpc+"/examples.Busi/TransOutRevert", req) + saga.Add(examples.BusiGrpc+"/examples.Busi/TransIn", examples.BusiGrpc+"/examples.Busi/TransInRevert", req) + return saga +}