diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index e022ab8..0b9a45d 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -36,6 +36,17 @@ jobs: - name: Install dependencies run: go get . + + - name: Install Protoc + uses: arduino/setup-protoc@v2 + + - name: Installing protoc-gen-go + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 + + - name: Generate grpc code + run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto - name: Run integration tests run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers diff --git a/api/orchestration.go b/api/orchestration.go index 1250644..83dbca6 100644 --- a/api/orchestration.go +++ b/api/orchestration.go @@ -12,6 +12,14 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) +type InstanceExistsOption int + +const ( + THROW_IF_EXIST InstanceExistsOption = iota + SKIP_IF_EXIST + TERMINATE_IF_EXIST +) + var ( ErrInstanceNotFound = errors.New("no such instance exists") ErrNotStarted = errors.New("orchestration has not started") @@ -87,6 +95,22 @@ func WithStartTime(startTime time.Time) NewOrchestrationOptions { } } +// WithInstanceExistsOption configures an option when orchestartion with same instance id already exists. +// If not specified, InstanceExistOption_THROW_IF_EXIST be used. +func WithInstanceExistsOption(option InstanceExistsOption) NewOrchestrationOptions { + return func(req *protos.CreateInstanceRequest) error { + switch option { + case SKIP_IF_EXIST: + req.InstanceExistOption = protos.InstanceExistOption_SKIP_IF_EXIST + case TERMINATE_IF_EXIST: + req.InstanceExistOption = protos.InstanceExistOption_TERMINATE_IF_EXIST + default: + req.InstanceExistOption = protos.InstanceExistOption_THROW_IF_EXIST + } + return nil + } +} + // WithFetchPayloads configures whether to load orchestration inputs, outputs, and custom status values, which could be large. func WithFetchPayloads(fetchPayloads bool) FetchOrchestrationMetadataOptions { return func(req *protos.GetInstanceRequest) { diff --git a/backend/backend.go b/backend/backend.go index 8edf583..3bb7504 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -16,6 +16,7 @@ var ( ErrNotInitialized = errors.New("backend not initialized") ErrWorkItemLockLost = errors.New("lock on work-item was lost") ErrBackendAlreadyStarted = errors.New("backend is already started") + ErrInstanceNotExists = errors.New("isntance does not exist") ) type ( @@ -91,6 +92,11 @@ type Backend interface { // [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist. // [api.ErrNotCompleted] is returned if the specified orchestration instance is still running. PurgeOrchestrationState(context.Context, api.InstanceID) error + + // CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub. + // + // [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist. + CleanupOrchestration(context.Context, api.InstanceID) error } // MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array. diff --git a/backend/executor.go b/backend/executor.go index 4718663..1d13abf 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -311,10 +311,32 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst defer span.End() e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span)) - if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil { - return nil, err + + // Question: from customer side SKIP_IF_EXIST and TERMINATE_IF_EXIST have the same behavior if success. + // Do customers need to know the difference between them? SKIP_IF_EXIST use the old instance, whereas + // TERMINATE_IF_EXIST terminates old instance and create a new instance with the same instance id. + + // Terminate existing instance if requested + if req.InstanceExistOption == protos.InstanceExistOption_TERMINATE_IF_EXIST { + if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil && !errors.Is(err, api.ErrInstanceNotFound) { + return nil, err + } } + // Create or update orchestration instance + err := g.backend.CreateOrchestrationInstance(ctx, e) + if err != nil { + if errors.Is(err, ErrDuplicateEvent) { + // Throw if instance already exists + if req.InstanceExistOption == protos.InstanceExistOption_THROW_IF_EXIST { + return nil, err + } + // Log a warning and skip if instance already exists + g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID) + } else { + return nil, err + } + } return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 8251116..cd0afca 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -873,6 +873,57 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins return nil } +func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id)) + if err := row.Err(); err != nil { + return fmt.Errorf("failed to query for instance existence: %w", err) + } + + var unused int + if err := row.Scan(&unused); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return api.ErrInstanceNotFound + } else { + return fmt.Errorf("failed to scan instance existence: %w", err) + } + } + + _, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from the Instances table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from History table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewEvents table: %w", err) + } + + _, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewTasks table: %w", err) + } + + if err = tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil +} + // Start implements backend.Backend func (*sqliteBackend) Start(context.Context) error { return nil diff --git a/submodules/durabletask-protobuf b/submodules/durabletask-protobuf index 3c5d082..33cdb47 160000 --- a/submodules/durabletask-protobuf +++ b/submodules/durabletask-protobuf @@ -1 +1 @@ -Subproject commit 3c5d082b5b24adc351a0b8693f023272be18d691 +Subproject commit 33cdb4732d106daadb4f8cfa5047ddcaea927a97 diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index 9869694..e0f764b 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -227,3 +227,79 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) { }) } } + +func Test_Grpc_ReuseInstanceIDSkipOrTerminate(t *testing.T) { + delayTime := 4 * time.Second + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + ctx.CreateTimer(delayTime).Await(nil) + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + return fmt.Sprintf("Hello, %s!", name), nil + }) + + cancelListener := startGrpcListener(t, r) + defer cancelListener() + instanceIDs := []api.InstanceID{"SKIP_IF_EXIST", "TERMINATE_IF_EXIST"} + options := []api.InstanceExistsOption{api.SKIP_IF_EXIST, api.TERMINATE_IF_EXIST} + + for i, option := range options { + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs[i])) + require.NoError(t, err) + id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithInstanceExistsOption(option)) + require.NoError(t, err) + timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) + defer cancelTimeout() + metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) + require.NoError(t, err) + assert.Equal(t, true, metadata.IsComplete()) + assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) + time.Sleep(1 * time.Second) + } +} + +func Test_Grpc_ReuseInstanceIDThrowIfExists(t *testing.T) { + delayTime := 4 * time.Second + r := task.NewTaskRegistry() + r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + ctx.CreateTimer(delayTime).Await(nil) + err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) + return output, err + }) + r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { + var name string + if err := ctx.GetInput(&name); err != nil { + return nil, err + } + return fmt.Sprintf("Hello, %s!", name), nil + }) + + cancelListener := startGrpcListener(t, r) + defer cancelListener() + + instanceID := api.InstanceID("123") + option := api.THROW_IF_EXIST + + id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID)) + require.NoError(t, err) + _, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithInstanceExistsOption(option)) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "duplicate event") + } +} diff --git a/tests/mocks/Backend.go b/tests/mocks/Backend.go index 4b00f11..c558e04 100644 --- a/tests/mocks/Backend.go +++ b/tests/mocks/Backend.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -30,6 +30,10 @@ func (_m *Backend) EXPECT() *Backend_Expecter { func (_m *Backend) AbandonActivityWorkItem(_a0 context.Context, _a1 *backend.ActivityWorkItem) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for AbandonActivityWorkItem") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *backend.ActivityWorkItem) error); ok { r0 = rf(_a0, _a1) @@ -46,8 +50,8 @@ type Backend_AbandonActivityWorkItem_Call struct { } // AbandonActivityWorkItem is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *backend.ActivityWorkItem +// - _a0 context.Context +// - _a1 *backend.ActivityWorkItem func (_e *Backend_Expecter) AbandonActivityWorkItem(_a0 interface{}, _a1 interface{}) *Backend_AbandonActivityWorkItem_Call { return &Backend_AbandonActivityWorkItem_Call{Call: _e.mock.On("AbandonActivityWorkItem", _a0, _a1)} } @@ -64,10 +68,19 @@ func (_c *Backend_AbandonActivityWorkItem_Call) Return(_a0 error) *Backend_Aband return _c } +func (_c *Backend_AbandonActivityWorkItem_Call) RunAndReturn(run func(context.Context, *backend.ActivityWorkItem) error) *Backend_AbandonActivityWorkItem_Call { + _c.Call.Return(run) + return _c +} + // AbandonOrchestrationWorkItem provides a mock function with given fields: _a0, _a1 func (_m *Backend) AbandonOrchestrationWorkItem(_a0 context.Context, _a1 *backend.OrchestrationWorkItem) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for AbandonOrchestrationWorkItem") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *backend.OrchestrationWorkItem) error); ok { r0 = rf(_a0, _a1) @@ -84,8 +97,8 @@ type Backend_AbandonOrchestrationWorkItem_Call struct { } // AbandonOrchestrationWorkItem is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *backend.OrchestrationWorkItem +// - _a0 context.Context +// - _a1 *backend.OrchestrationWorkItem func (_e *Backend_Expecter) AbandonOrchestrationWorkItem(_a0 interface{}, _a1 interface{}) *Backend_AbandonOrchestrationWorkItem_Call { return &Backend_AbandonOrchestrationWorkItem_Call{Call: _e.mock.On("AbandonOrchestrationWorkItem", _a0, _a1)} } @@ -102,10 +115,19 @@ func (_c *Backend_AbandonOrchestrationWorkItem_Call) Return(_a0 error) *Backend_ return _c } +func (_c *Backend_AbandonOrchestrationWorkItem_Call) RunAndReturn(run func(context.Context, *backend.OrchestrationWorkItem) error) *Backend_AbandonOrchestrationWorkItem_Call { + _c.Call.Return(run) + return _c +} + // AddNewOrchestrationEvent provides a mock function with given fields: _a0, _a1, _a2 func (_m *Backend) AddNewOrchestrationEvent(_a0 context.Context, _a1 api.InstanceID, _a2 *protos.HistoryEvent) error { ret := _m.Called(_a0, _a1, _a2) + if len(ret) == 0 { + panic("no return value specified for AddNewOrchestrationEvent") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, *protos.HistoryEvent) error); ok { r0 = rf(_a0, _a1, _a2) @@ -122,9 +144,9 @@ type Backend_AddNewOrchestrationEvent_Call struct { } // AddNewOrchestrationEvent is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 api.InstanceID -// - _a2 *protos.HistoryEvent +// - _a0 context.Context +// - _a1 api.InstanceID +// - _a2 *protos.HistoryEvent func (_e *Backend_Expecter) AddNewOrchestrationEvent(_a0 interface{}, _a1 interface{}, _a2 interface{}) *Backend_AddNewOrchestrationEvent_Call { return &Backend_AddNewOrchestrationEvent_Call{Call: _e.mock.On("AddNewOrchestrationEvent", _a0, _a1, _a2)} } @@ -141,10 +163,66 @@ func (_c *Backend_AddNewOrchestrationEvent_Call) Return(_a0 error) *Backend_AddN return _c } +func (_c *Backend_AddNewOrchestrationEvent_Call) RunAndReturn(run func(context.Context, api.InstanceID, *protos.HistoryEvent) error) *Backend_AddNewOrchestrationEvent_Call { + _c.Call.Return(run) + return _c +} + +// CleanupOrchestration provides a mock function with given fields: _a0, _a1 +func (_m *Backend) CleanupOrchestration(_a0 context.Context, _a1 api.InstanceID) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for CleanupOrchestration") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Backend_CleanupOrchestration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupOrchestration' +type Backend_CleanupOrchestration_Call struct { + *mock.Call +} + +// CleanupOrchestration is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 api.InstanceID +func (_e *Backend_Expecter) CleanupOrchestration(_a0 interface{}, _a1 interface{}) *Backend_CleanupOrchestration_Call { + return &Backend_CleanupOrchestration_Call{Call: _e.mock.On("CleanupOrchestration", _a0, _a1)} +} + +func (_c *Backend_CleanupOrchestration_Call) Run(run func(_a0 context.Context, _a1 api.InstanceID)) *Backend_CleanupOrchestration_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(api.InstanceID)) + }) + return _c +} + +func (_c *Backend_CleanupOrchestration_Call) Return(_a0 error) *Backend_CleanupOrchestration_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Backend_CleanupOrchestration_Call) RunAndReturn(run func(context.Context, api.InstanceID) error) *Backend_CleanupOrchestration_Call { + _c.Call.Return(run) + return _c +} + // CompleteActivityWorkItem provides a mock function with given fields: _a0, _a1 func (_m *Backend) CompleteActivityWorkItem(_a0 context.Context, _a1 *backend.ActivityWorkItem) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for CompleteActivityWorkItem") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *backend.ActivityWorkItem) error); ok { r0 = rf(_a0, _a1) @@ -161,8 +239,8 @@ type Backend_CompleteActivityWorkItem_Call struct { } // CompleteActivityWorkItem is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *backend.ActivityWorkItem +// - _a0 context.Context +// - _a1 *backend.ActivityWorkItem func (_e *Backend_Expecter) CompleteActivityWorkItem(_a0 interface{}, _a1 interface{}) *Backend_CompleteActivityWorkItem_Call { return &Backend_CompleteActivityWorkItem_Call{Call: _e.mock.On("CompleteActivityWorkItem", _a0, _a1)} } @@ -179,10 +257,19 @@ func (_c *Backend_CompleteActivityWorkItem_Call) Return(_a0 error) *Backend_Comp return _c } +func (_c *Backend_CompleteActivityWorkItem_Call) RunAndReturn(run func(context.Context, *backend.ActivityWorkItem) error) *Backend_CompleteActivityWorkItem_Call { + _c.Call.Return(run) + return _c +} + // CompleteOrchestrationWorkItem provides a mock function with given fields: _a0, _a1 func (_m *Backend) CompleteOrchestrationWorkItem(_a0 context.Context, _a1 *backend.OrchestrationWorkItem) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for CompleteOrchestrationWorkItem") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *backend.OrchestrationWorkItem) error); ok { r0 = rf(_a0, _a1) @@ -199,8 +286,8 @@ type Backend_CompleteOrchestrationWorkItem_Call struct { } // CompleteOrchestrationWorkItem is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *backend.OrchestrationWorkItem +// - _a0 context.Context +// - _a1 *backend.OrchestrationWorkItem func (_e *Backend_Expecter) CompleteOrchestrationWorkItem(_a0 interface{}, _a1 interface{}) *Backend_CompleteOrchestrationWorkItem_Call { return &Backend_CompleteOrchestrationWorkItem_Call{Call: _e.mock.On("CompleteOrchestrationWorkItem", _a0, _a1)} } @@ -217,10 +304,19 @@ func (_c *Backend_CompleteOrchestrationWorkItem_Call) Return(_a0 error) *Backend return _c } +func (_c *Backend_CompleteOrchestrationWorkItem_Call) RunAndReturn(run func(context.Context, *backend.OrchestrationWorkItem) error) *Backend_CompleteOrchestrationWorkItem_Call { + _c.Call.Return(run) + return _c +} + // CreateOrchestrationInstance provides a mock function with given fields: _a0, _a1 func (_m *Backend) CreateOrchestrationInstance(_a0 context.Context, _a1 *protos.HistoryEvent) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for CreateOrchestrationInstance") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *protos.HistoryEvent) error); ok { r0 = rf(_a0, _a1) @@ -237,8 +333,8 @@ type Backend_CreateOrchestrationInstance_Call struct { } // CreateOrchestrationInstance is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *protos.HistoryEvent +// - _a0 context.Context +// - _a1 *protos.HistoryEvent func (_e *Backend_Expecter) CreateOrchestrationInstance(_a0 interface{}, _a1 interface{}) *Backend_CreateOrchestrationInstance_Call { return &Backend_CreateOrchestrationInstance_Call{Call: _e.mock.On("CreateOrchestrationInstance", _a0, _a1)} } @@ -255,10 +351,19 @@ func (_c *Backend_CreateOrchestrationInstance_Call) Return(_a0 error) *Backend_C return _c } +func (_c *Backend_CreateOrchestrationInstance_Call) RunAndReturn(run func(context.Context, *protos.HistoryEvent) error) *Backend_CreateOrchestrationInstance_Call { + _c.Call.Return(run) + return _c +} + // CreateTaskHub provides a mock function with given fields: _a0 func (_m *Backend) CreateTaskHub(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for CreateTaskHub") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -275,7 +380,7 @@ type Backend_CreateTaskHub_Call struct { } // CreateTaskHub is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) CreateTaskHub(_a0 interface{}) *Backend_CreateTaskHub_Call { return &Backend_CreateTaskHub_Call{Call: _e.mock.On("CreateTaskHub", _a0)} } @@ -292,10 +397,19 @@ func (_c *Backend_CreateTaskHub_Call) Return(_a0 error) *Backend_CreateTaskHub_C return _c } +func (_c *Backend_CreateTaskHub_Call) RunAndReturn(run func(context.Context) error) *Backend_CreateTaskHub_Call { + _c.Call.Return(run) + return _c +} + // DeleteTaskHub provides a mock function with given fields: _a0 func (_m *Backend) DeleteTaskHub(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for DeleteTaskHub") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -312,7 +426,7 @@ type Backend_DeleteTaskHub_Call struct { } // DeleteTaskHub is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) DeleteTaskHub(_a0 interface{}) *Backend_DeleteTaskHub_Call { return &Backend_DeleteTaskHub_Call{Call: _e.mock.On("DeleteTaskHub", _a0)} } @@ -329,11 +443,24 @@ func (_c *Backend_DeleteTaskHub_Call) Return(_a0 error) *Backend_DeleteTaskHub_C return _c } +func (_c *Backend_DeleteTaskHub_Call) RunAndReturn(run func(context.Context) error) *Backend_DeleteTaskHub_Call { + _c.Call.Return(run) + return _c +} + // GetActivityWorkItem provides a mock function with given fields: _a0 func (_m *Backend) GetActivityWorkItem(_a0 context.Context) (*backend.ActivityWorkItem, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for GetActivityWorkItem") + } + var r0 *backend.ActivityWorkItem + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*backend.ActivityWorkItem, error)); ok { + return rf(_a0) + } if rf, ok := ret.Get(0).(func(context.Context) *backend.ActivityWorkItem); ok { r0 = rf(_a0) } else { @@ -342,7 +469,6 @@ func (_m *Backend) GetActivityWorkItem(_a0 context.Context) (*backend.ActivityWo } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(_a0) } else { @@ -358,7 +484,7 @@ type Backend_GetActivityWorkItem_Call struct { } // GetActivityWorkItem is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) GetActivityWorkItem(_a0 interface{}) *Backend_GetActivityWorkItem_Call { return &Backend_GetActivityWorkItem_Call{Call: _e.mock.On("GetActivityWorkItem", _a0)} } @@ -375,11 +501,24 @@ func (_c *Backend_GetActivityWorkItem_Call) Return(_a0 *backend.ActivityWorkItem return _c } +func (_c *Backend_GetActivityWorkItem_Call) RunAndReturn(run func(context.Context) (*backend.ActivityWorkItem, error)) *Backend_GetActivityWorkItem_Call { + _c.Call.Return(run) + return _c +} + // GetOrchestrationMetadata provides a mock function with given fields: _a0, _a1 func (_m *Backend) GetOrchestrationMetadata(_a0 context.Context, _a1 api.InstanceID) (*api.OrchestrationMetadata, error) { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for GetOrchestrationMetadata") + } + var r0 *api.OrchestrationMetadata + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error)); ok { + return rf(_a0, _a1) + } if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) *api.OrchestrationMetadata); ok { r0 = rf(_a0, _a1) } else { @@ -388,7 +527,6 @@ func (_m *Backend) GetOrchestrationMetadata(_a0 context.Context, _a1 api.Instanc } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, api.InstanceID) error); ok { r1 = rf(_a0, _a1) } else { @@ -404,8 +542,8 @@ type Backend_GetOrchestrationMetadata_Call struct { } // GetOrchestrationMetadata is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 api.InstanceID +// - _a0 context.Context +// - _a1 api.InstanceID func (_e *Backend_Expecter) GetOrchestrationMetadata(_a0 interface{}, _a1 interface{}) *Backend_GetOrchestrationMetadata_Call { return &Backend_GetOrchestrationMetadata_Call{Call: _e.mock.On("GetOrchestrationMetadata", _a0, _a1)} } @@ -422,11 +560,24 @@ func (_c *Backend_GetOrchestrationMetadata_Call) Return(_a0 *api.OrchestrationMe return _c } +func (_c *Backend_GetOrchestrationMetadata_Call) RunAndReturn(run func(context.Context, api.InstanceID) (*api.OrchestrationMetadata, error)) *Backend_GetOrchestrationMetadata_Call { + _c.Call.Return(run) + return _c +} + // GetOrchestrationRuntimeState provides a mock function with given fields: _a0, _a1 func (_m *Backend) GetOrchestrationRuntimeState(_a0 context.Context, _a1 *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error) { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for GetOrchestrationRuntimeState") + } + var r0 *backend.OrchestrationRuntimeState + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error)); ok { + return rf(_a0, _a1) + } if rf, ok := ret.Get(0).(func(context.Context, *backend.OrchestrationWorkItem) *backend.OrchestrationRuntimeState); ok { r0 = rf(_a0, _a1) } else { @@ -435,7 +586,6 @@ func (_m *Backend) GetOrchestrationRuntimeState(_a0 context.Context, _a1 *backen } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *backend.OrchestrationWorkItem) error); ok { r1 = rf(_a0, _a1) } else { @@ -451,8 +601,8 @@ type Backend_GetOrchestrationRuntimeState_Call struct { } // GetOrchestrationRuntimeState is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *backend.OrchestrationWorkItem +// - _a0 context.Context +// - _a1 *backend.OrchestrationWorkItem func (_e *Backend_Expecter) GetOrchestrationRuntimeState(_a0 interface{}, _a1 interface{}) *Backend_GetOrchestrationRuntimeState_Call { return &Backend_GetOrchestrationRuntimeState_Call{Call: _e.mock.On("GetOrchestrationRuntimeState", _a0, _a1)} } @@ -469,11 +619,24 @@ func (_c *Backend_GetOrchestrationRuntimeState_Call) Return(_a0 *backend.Orchest return _c } +func (_c *Backend_GetOrchestrationRuntimeState_Call) RunAndReturn(run func(context.Context, *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error)) *Backend_GetOrchestrationRuntimeState_Call { + _c.Call.Return(run) + return _c +} + // GetOrchestrationWorkItem provides a mock function with given fields: _a0 func (_m *Backend) GetOrchestrationWorkItem(_a0 context.Context) (*backend.OrchestrationWorkItem, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for GetOrchestrationWorkItem") + } + var r0 *backend.OrchestrationWorkItem + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*backend.OrchestrationWorkItem, error)); ok { + return rf(_a0) + } if rf, ok := ret.Get(0).(func(context.Context) *backend.OrchestrationWorkItem); ok { r0 = rf(_a0) } else { @@ -482,7 +645,6 @@ func (_m *Backend) GetOrchestrationWorkItem(_a0 context.Context) (*backend.Orche } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(_a0) } else { @@ -498,7 +660,7 @@ type Backend_GetOrchestrationWorkItem_Call struct { } // GetOrchestrationWorkItem is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) GetOrchestrationWorkItem(_a0 interface{}) *Backend_GetOrchestrationWorkItem_Call { return &Backend_GetOrchestrationWorkItem_Call{Call: _e.mock.On("GetOrchestrationWorkItem", _a0)} } @@ -515,10 +677,19 @@ func (_c *Backend_GetOrchestrationWorkItem_Call) Return(_a0 *backend.Orchestrati return _c } +func (_c *Backend_GetOrchestrationWorkItem_Call) RunAndReturn(run func(context.Context) (*backend.OrchestrationWorkItem, error)) *Backend_GetOrchestrationWorkItem_Call { + _c.Call.Return(run) + return _c +} + // PurgeOrchestrationState provides a mock function with given fields: _a0, _a1 func (_m *Backend) PurgeOrchestrationState(_a0 context.Context, _a1 api.InstanceID) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for PurgeOrchestrationState") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) error); ok { r0 = rf(_a0, _a1) @@ -535,8 +706,8 @@ type Backend_PurgeOrchestrationState_Call struct { } // PurgeOrchestrationState is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 api.InstanceID +// - _a0 context.Context +// - _a1 api.InstanceID func (_e *Backend_Expecter) PurgeOrchestrationState(_a0 interface{}, _a1 interface{}) *Backend_PurgeOrchestrationState_Call { return &Backend_PurgeOrchestrationState_Call{Call: _e.mock.On("PurgeOrchestrationState", _a0, _a1)} } @@ -553,10 +724,19 @@ func (_c *Backend_PurgeOrchestrationState_Call) Return(_a0 error) *Backend_Purge return _c } +func (_c *Backend_PurgeOrchestrationState_Call) RunAndReturn(run func(context.Context, api.InstanceID) error) *Backend_PurgeOrchestrationState_Call { + _c.Call.Return(run) + return _c +} + // Start provides a mock function with given fields: _a0 func (_m *Backend) Start(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Start") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -573,7 +753,7 @@ type Backend_Start_Call struct { } // Start is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) Start(_a0 interface{}) *Backend_Start_Call { return &Backend_Start_Call{Call: _e.mock.On("Start", _a0)} } @@ -590,10 +770,19 @@ func (_c *Backend_Start_Call) Return(_a0 error) *Backend_Start_Call { return _c } +func (_c *Backend_Start_Call) RunAndReturn(run func(context.Context) error) *Backend_Start_Call { + _c.Call.Return(run) + return _c +} + // Stop provides a mock function with given fields: _a0 func (_m *Backend) Stop(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Stop") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -610,7 +799,7 @@ type Backend_Stop_Call struct { } // Stop is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *Backend_Expecter) Stop(_a0 interface{}) *Backend_Stop_Call { return &Backend_Stop_Call{Call: _e.mock.On("Stop", _a0)} } @@ -627,13 +816,17 @@ func (_c *Backend_Stop_Call) Return(_a0 error) *Backend_Stop_Call { return _c } -type mockConstructorTestingTNewBackend interface { - mock.TestingT - Cleanup(func()) +func (_c *Backend_Stop_Call) RunAndReturn(run func(context.Context) error) *Backend_Stop_Call { + _c.Call.Return(run) + return _c } // NewBackend creates a new instance of Backend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBackend(t mockConstructorTestingTNewBackend) *Backend { +// The first argument is typically a *testing.T value. +func NewBackend(t interface { + mock.TestingT + Cleanup(func()) +}) *Backend { mock := &Backend{} mock.Mock.Test(t) diff --git a/tests/mocks/Executor.go b/tests/mocks/Executor.go index 1f704e6..659ac36 100644 --- a/tests/mocks/Executor.go +++ b/tests/mocks/Executor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -30,7 +30,15 @@ func (_m *Executor) EXPECT() *Executor_Expecter { func (_m *Executor) ExecuteActivity(_a0 context.Context, _a1 api.InstanceID, _a2 *protos.HistoryEvent) (*protos.HistoryEvent, error) { ret := _m.Called(_a0, _a1, _a2) + if len(ret) == 0 { + panic("no return value specified for ExecuteActivity") + } + var r0 *protos.HistoryEvent + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)); ok { + return rf(_a0, _a1, _a2) + } if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, *protos.HistoryEvent) *protos.HistoryEvent); ok { r0 = rf(_a0, _a1, _a2) } else { @@ -39,7 +47,6 @@ func (_m *Executor) ExecuteActivity(_a0 context.Context, _a1 api.InstanceID, _a2 } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, api.InstanceID, *protos.HistoryEvent) error); ok { r1 = rf(_a0, _a1, _a2) } else { @@ -55,9 +62,9 @@ type Executor_ExecuteActivity_Call struct { } // ExecuteActivity is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 api.InstanceID -// - _a2 *protos.HistoryEvent +// - _a0 context.Context +// - _a1 api.InstanceID +// - _a2 *protos.HistoryEvent func (_e *Executor_Expecter) ExecuteActivity(_a0 interface{}, _a1 interface{}, _a2 interface{}) *Executor_ExecuteActivity_Call { return &Executor_ExecuteActivity_Call{Call: _e.mock.On("ExecuteActivity", _a0, _a1, _a2)} } @@ -74,11 +81,24 @@ func (_c *Executor_ExecuteActivity_Call) Return(_a0 *protos.HistoryEvent, _a1 er return _c } +func (_c *Executor_ExecuteActivity_Call) RunAndReturn(run func(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)) *Executor_ExecuteActivity_Call { + _c.Call.Return(run) + return _c +} + // ExecuteOrchestrator provides a mock function with given fields: ctx, iid, oldEvents, newEvents func (_m *Executor) ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*backend.ExecutionResults, error) { ret := _m.Called(ctx, iid, oldEvents, newEvents) + if len(ret) == 0 { + panic("no return value specified for ExecuteOrchestrator") + } + var r0 *backend.ExecutionResults + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, []*protos.HistoryEvent, []*protos.HistoryEvent) (*backend.ExecutionResults, error)); ok { + return rf(ctx, iid, oldEvents, newEvents) + } if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, []*protos.HistoryEvent, []*protos.HistoryEvent) *backend.ExecutionResults); ok { r0 = rf(ctx, iid, oldEvents, newEvents) } else { @@ -87,7 +107,6 @@ func (_m *Executor) ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, api.InstanceID, []*protos.HistoryEvent, []*protos.HistoryEvent) error); ok { r1 = rf(ctx, iid, oldEvents, newEvents) } else { @@ -103,10 +122,10 @@ type Executor_ExecuteOrchestrator_Call struct { } // ExecuteOrchestrator is a helper method to define mock.On call -// - ctx context.Context -// - iid api.InstanceID -// - oldEvents []*protos.HistoryEvent -// - newEvents []*protos.HistoryEvent +// - ctx context.Context +// - iid api.InstanceID +// - oldEvents []*protos.HistoryEvent +// - newEvents []*protos.HistoryEvent func (_e *Executor_Expecter) ExecuteOrchestrator(ctx interface{}, iid interface{}, oldEvents interface{}, newEvents interface{}) *Executor_ExecuteOrchestrator_Call { return &Executor_ExecuteOrchestrator_Call{Call: _e.mock.On("ExecuteOrchestrator", ctx, iid, oldEvents, newEvents)} } @@ -123,13 +142,17 @@ func (_c *Executor_ExecuteOrchestrator_Call) Return(_a0 *backend.ExecutionResult return _c } -type mockConstructorTestingTNewExecutor interface { - mock.TestingT - Cleanup(func()) +func (_c *Executor_ExecuteOrchestrator_Call) RunAndReturn(run func(context.Context, api.InstanceID, []*protos.HistoryEvent, []*protos.HistoryEvent) (*backend.ExecutionResults, error)) *Executor_ExecuteOrchestrator_Call { + _c.Call.Return(run) + return _c } // NewExecutor creates a new instance of Executor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewExecutor(t mockConstructorTestingTNewExecutor) *Executor { +// The first argument is typically a *testing.T value. +func NewExecutor(t interface { + mock.TestingT + Cleanup(func()) +}) *Executor { mock := &Executor{} mock.Mock.Test(t) diff --git a/tests/mocks/TaskWorker.go b/tests/mocks/TaskWorker.go index 38e7f7c..7ee0f2c 100644 --- a/tests/mocks/TaskWorker.go +++ b/tests/mocks/TaskWorker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -25,14 +25,21 @@ func (_m *TaskWorker) EXPECT() *TaskWorker_Expecter { func (_m *TaskWorker) ProcessNext(_a0 context.Context) (bool, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for ProcessNext") + } + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok { + return rf(_a0) + } if rf, ok := ret.Get(0).(func(context.Context) bool); ok { r0 = rf(_a0) } else { r0 = ret.Get(0).(bool) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(_a0) } else { @@ -48,7 +55,7 @@ type TaskWorker_ProcessNext_Call struct { } // ProcessNext is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *TaskWorker_Expecter) ProcessNext(_a0 interface{}) *TaskWorker_ProcessNext_Call { return &TaskWorker_ProcessNext_Call{Call: _e.mock.On("ProcessNext", _a0)} } @@ -65,6 +72,11 @@ func (_c *TaskWorker_ProcessNext_Call) Return(_a0 bool, _a1 error) *TaskWorker_P return _c } +func (_c *TaskWorker_ProcessNext_Call) RunAndReturn(run func(context.Context) (bool, error)) *TaskWorker_ProcessNext_Call { + _c.Call.Return(run) + return _c +} + // Start provides a mock function with given fields: _a0 func (_m *TaskWorker) Start(_a0 context.Context) { _m.Called(_a0) @@ -76,7 +88,7 @@ type TaskWorker_Start_Call struct { } // Start is a helper method to define mock.On call -// - _a0 context.Context +// - _a0 context.Context func (_e *TaskWorker_Expecter) Start(_a0 interface{}) *TaskWorker_Start_Call { return &TaskWorker_Start_Call{Call: _e.mock.On("Start", _a0)} } @@ -93,6 +105,11 @@ func (_c *TaskWorker_Start_Call) Return() *TaskWorker_Start_Call { return _c } +func (_c *TaskWorker_Start_Call) RunAndReturn(run func(context.Context)) *TaskWorker_Start_Call { + _c.Call.Return(run) + return _c +} + // StopAndDrain provides a mock function with given fields: func (_m *TaskWorker) StopAndDrain() { _m.Called() @@ -120,13 +137,17 @@ func (_c *TaskWorker_StopAndDrain_Call) Return() *TaskWorker_StopAndDrain_Call { return _c } -type mockConstructorTestingTNewTaskWorker interface { - mock.TestingT - Cleanup(func()) +func (_c *TaskWorker_StopAndDrain_Call) RunAndReturn(run func()) *TaskWorker_StopAndDrain_Call { + _c.Call.Return(run) + return _c } // NewTaskWorker creates a new instance of TaskWorker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewTaskWorker(t mockConstructorTestingTNewTaskWorker) *TaskWorker { +// The first argument is typically a *testing.T value. +func NewTaskWorker(t interface { + mock.TestingT + Cleanup(func()) +}) *TaskWorker { mock := &TaskWorker{} mock.Mock.Test(t)