Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ jobs:

- name: Install dependencies
run: go get .

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run integration tests
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers
24 changes: 24 additions & 0 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

type InstanceExistsOption int

const (
THROW_IF_EXIST InstanceExistsOption = iota
SKIP_IF_EXIST
TERMINATE_IF_EXIST
)

var (
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
Expand Down Expand Up @@ -87,6 +95,22 @@ func WithStartTime(startTime time.Time) NewOrchestrationOptions {
}
}

// WithInstanceExistsOption configures an option when orchestartion with same instance id already exists.
// If not specified, InstanceExistOption_THROW_IF_EXIST be used.
func WithInstanceExistsOption(option InstanceExistsOption) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
switch option {
case SKIP_IF_EXIST:
req.InstanceExistOption = protos.InstanceExistOption_SKIP_IF_EXIST
case TERMINATE_IF_EXIST:
req.InstanceExistOption = protos.InstanceExistOption_TERMINATE_IF_EXIST
default:
req.InstanceExistOption = protos.InstanceExistOption_THROW_IF_EXIST
}
return nil
}
}

// WithFetchPayloads configures whether to load orchestration inputs, outputs, and custom status values, which could be large.
func WithFetchPayloads(fetchPayloads bool) FetchOrchestrationMetadataOptions {
return func(req *protos.GetInstanceRequest) {
Expand Down
6 changes: 6 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
ErrNotInitialized = errors.New("backend not initialized")
ErrWorkItemLockLost = errors.New("lock on work-item was lost")
ErrBackendAlreadyStarted = errors.New("backend is already started")
ErrInstanceNotExists = errors.New("isntance does not exist")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ErrInstanceNotExists = errors.New("isntance does not exist")
ErrInstanceNotExists = errors.New("instance does not exist")

)

type (
Expand Down Expand Up @@ -91,6 +92,11 @@ type Backend interface {
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
PurgeOrchestrationState(context.Context, api.InstanceID) error

// CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub.
//
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
CleanupOrchestration(context.Context, api.InstanceID) error
}

// MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
Expand Down
26 changes: 24 additions & 2 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,32 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
defer span.End()

e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
return nil, err

// Question: from customer side SKIP_IF_EXIST and TERMINATE_IF_EXIST have the same behavior if success.
// Do customers need to know the difference between them? SKIP_IF_EXIST use the old instance, whereas
// TERMINATE_IF_EXIST terminates old instance and create a new instance with the same instance id.

// Terminate existing instance if requested
if req.InstanceExistOption == protos.InstanceExistOption_TERMINATE_IF_EXIST {
if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil && !errors.Is(err, api.ErrInstanceNotFound) {
return nil, err
}
}
// Create or update orchestration instance
err := g.backend.CreateOrchestrationInstance(ctx, e)

if err != nil {
if errors.Is(err, ErrDuplicateEvent) {
// Throw if instance already exists
if req.InstanceExistOption == protos.InstanceExistOption_THROW_IF_EXIST {
return nil, err
}
// Log a warning and skip if instance already exists
g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID)
} else {
return nil, err
}
}
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
}

Expand Down
51 changes: 51 additions & 0 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,57 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
return nil
}

func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error {
if err := be.ensureDB(); err != nil {
return err
}

tx, err := be.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else {
return fmt.Errorf("failed to scan instance existence: %w", err)
}
}

_, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}

// Start implements backend.Backend
func (*sqliteBackend) Start(context.Context) error {
return nil
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
76 changes: 76 additions & 0 deletions tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,79 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) {
})
}
}

func Test_Grpc_ReuseInstanceIDSkipOrTerminate(t *testing.T) {
delayTime := 4 * time.Second
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
ctx.CreateTimer(delayTime).Await(nil)
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceIDs := []api.InstanceID{"SKIP_IF_EXIST", "TERMINATE_IF_EXIST"}
options := []api.InstanceExistsOption{api.SKIP_IF_EXIST, api.TERMINATE_IF_EXIST}

for i, option := range options {
id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs[i]))
require.NoError(t, err)
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithInstanceExistsOption(option))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
time.Sleep(1 * time.Second)
}
}

func Test_Grpc_ReuseInstanceIDThrowIfExists(t *testing.T) {
delayTime := 4 * time.Second
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
ctx.CreateTimer(delayTime).Await(nil)
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()

instanceID := api.InstanceID("123")
option := api.THROW_IF_EXIST

id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
require.NoError(t, err)
_, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithInstanceExistsOption(option))
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "duplicate event")
}
}
Loading