diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index 8381d30..3a428c9 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -78,6 +78,9 @@ export { TOrchestrator } from "./types/orchestrator.type"; export { TActivity } from "./types/activity.type"; export { TInput } from "./types/input.type"; export { TOutput } from "./types/output.type"; + +// Testing utilities +export { InMemoryOrchestrationBackend, TestOrchestrationClient, TestOrchestrationWorker } from "./testing"; export { ParentOrchestrationInstance } from "./types/parent-orchestration-instance.type"; // Logger diff --git a/packages/durabletask-js/src/testing/in-memory-backend.ts b/packages/durabletask-js/src/testing/in-memory-backend.ts new file mode 100644 index 0000000..a32a649 --- /dev/null +++ b/packages/durabletask-js/src/testing/in-memory-backend.ts @@ -0,0 +1,650 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; +import * as pbh from "../utils/pb-helper.util"; +import { OrchestrationStatus as ClientOrchestrationStatus } from "../orchestration/enum/orchestration-status.enum"; + +/** + * Internal orchestration instance state stored by the in-memory backend. + */ +export interface OrchestrationInstance { + instanceId: string; + name: string; + status: pb.OrchestrationStatus; + input?: string; + output?: string; + customStatus?: string; + createdAt: Date; + lastUpdatedAt: Date; + failureDetails?: pb.TaskFailureDetails; + history: pb.HistoryEvent[]; + pendingEvents: pb.HistoryEvent[]; + completionToken: number; +} + +/** + * Activity work item that needs to be executed. + */ +export interface ActivityWorkItem { + instanceId: string; + name: string; + taskId: number; + input?: string; + completionToken: number; +} + +/** + * Promise resolver for waiting on orchestration state changes. + */ +interface StateWaiter { + resolve: (instance: OrchestrationInstance | undefined) => void; + reject: (error: Error) => void; + predicate: (instance: OrchestrationInstance) => boolean; +} + +/** + * In-memory backend for durable orchestrations suitable for testing. + * + * This backend stores all orchestration state in memory and processes + * work items synchronously within the same process. It is designed for + * unit testing and integration testing scenarios where a sidecar process + * or external storage is not desired. + * + * Thread-safety: All state mutations are performed synchronously via + * the event loop. The backend uses a simple work queue pattern to ensure + * that orchestration and activity processing happens in a predictable order. + */ +export class InMemoryOrchestrationBackend { + private readonly instances: Map = new Map(); + private readonly orchestrationQueue: string[] = []; + private readonly orchestrationQueueSet: Set = new Set(); + private readonly activityQueue: ActivityWorkItem[] = []; + private readonly stateWaiters: Map = new Map(); + private nextCompletionToken: number = 1; + private readonly maxHistorySize: number; + + /** + * Creates a new in-memory backend. + * @param maxHistorySize Maximum number of history events per orchestration (default 10000) + */ + constructor(maxHistorySize: number = 10000) { + this.maxHistorySize = maxHistorySize; + } + + /** + * Creates a new orchestration instance. + */ + createInstance( + instanceId: string, + name: string, + input?: string, + scheduledStartTime?: Date, + ): string { + if (this.instances.has(instanceId)) { + throw new Error(`Orchestration instance '${instanceId}' already exists`); + } + + const now = new Date(); + const startTime = scheduledStartTime && scheduledStartTime > now ? scheduledStartTime : now; + + const instance: OrchestrationInstance = { + instanceId, + name, + status: pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING, + input, + createdAt: now, + lastUpdatedAt: now, + history: [], + pendingEvents: [], + completionToken: this.nextCompletionToken++, + }; + + // Add initial events to start the orchestration + const orchestratorStarted = pbh.newOrchestratorStartedEvent(startTime); + const executionStarted = pbh.newExecutionStartedEvent(name, instanceId, input); + + instance.pendingEvents.push(orchestratorStarted); + instance.pendingEvents.push(executionStarted); + + this.instances.set(instanceId, instance); + this.enqueueOrchestration(instanceId); + + return instanceId; + } + + /** + * Gets an orchestration instance by ID. + */ + getInstance(instanceId: string): OrchestrationInstance | undefined { + return this.instances.get(instanceId); + } + + /** + * Raises an external event for an orchestration instance. + */ + raiseEvent(instanceId: string, eventName: string, input?: string): void { + const instance = this.instances.get(instanceId); + if (!instance) { + throw new Error(`Orchestration instance '${instanceId}' not found`); + } + + const event = pbh.newEventRaisedEvent(eventName, input); + instance.pendingEvents.push(event); + instance.lastUpdatedAt = new Date(); + + // Ensure instance is queued for processing + if (!this.orchestrationQueueSet.has(instanceId)) { + this.enqueueOrchestration(instanceId); + } + } + + /** + * Terminates an orchestration instance. + */ + terminate(instanceId: string, output?: string): void { + const instance = this.instances.get(instanceId); + if (!instance) { + throw new Error(`Orchestration instance '${instanceId}' not found`); + } + + if (this.isTerminalStatus(instance.status)) { + return; // Already terminated + } + + const event = pbh.newTerminatedEvent(output); + instance.pendingEvents.push(event); + instance.lastUpdatedAt = new Date(); + + if (!this.orchestrationQueueSet.has(instanceId)) { + this.enqueueOrchestration(instanceId); + } + } + + /** + * Suspends an orchestration instance. + */ + suspend(instanceId: string): void { + const instance = this.instances.get(instanceId); + if (!instance) { + throw new Error(`Orchestration instance '${instanceId}' not found`); + } + + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) { + return; + } + + const event = pbh.newSuspendEvent(); + instance.pendingEvents.push(event); + instance.lastUpdatedAt = new Date(); + + if (!this.orchestrationQueueSet.has(instanceId)) { + this.enqueueOrchestration(instanceId); + } + } + + /** + * Resumes a suspended orchestration instance. + */ + resume(instanceId: string): void { + const instance = this.instances.get(instanceId); + if (!instance) { + throw new Error(`Orchestration instance '${instanceId}' not found`); + } + + const event = pbh.newResumeEvent(); + instance.pendingEvents.push(event); + instance.lastUpdatedAt = new Date(); + + if (!this.orchestrationQueueSet.has(instanceId)) { + this.enqueueOrchestration(instanceId); + } + } + + /** + * Purges an orchestration instance from the store. + */ + purge(instanceId: string): boolean { + const instance = this.instances.get(instanceId); + if (!instance) { + return false; + } + + if (!this.isTerminalStatus(instance.status)) { + return false; + } + + this.instances.delete(instanceId); + this.stateWaiters.delete(instanceId); + return true; + } + + /** + * Gets the next orchestration work item to process, if any. + */ + getNextOrchestrationWorkItem(): OrchestrationInstance | undefined { + while (this.orchestrationQueue.length > 0) { + const instanceId = this.orchestrationQueue.shift()!; + this.orchestrationQueueSet.delete(instanceId); + const instance = this.instances.get(instanceId); + + if (instance && instance.pendingEvents.length > 0) { + return instance; + } + } + return undefined; + } + + /** + * Gets the next activity work item to process, if any. + */ + getNextActivityWorkItem(): ActivityWorkItem | undefined { + return this.activityQueue.shift(); + } + + /** + * Completes an orchestration execution with the given actions. + */ + completeOrchestration( + instanceId: string, + completionToken: number, + actions: pb.OrchestratorAction[], + customStatus?: string, + ): void { + const instance = this.instances.get(instanceId); + if (!instance) { + throw new Error(`Orchestration instance '${instanceId}' not found`); + } + + if (instance.completionToken !== completionToken) { + // Stale completion - ignore + return; + } + + // Check history size limit before adding events + const projectedSize = instance.history.length + instance.pendingEvents.length; + if (projectedSize > this.maxHistorySize) { + throw new Error( + `Orchestration '${instanceId}' would exceed maximum history size of ${this.maxHistorySize} ` + + `(current: ${instance.history.length}, pending: ${instance.pendingEvents.length})`, + ); + } + + // Move pending events to history + instance.history.push(...instance.pendingEvents); + instance.pendingEvents = []; + instance.lastUpdatedAt = new Date(); + + if (customStatus !== undefined) { + instance.customStatus = customStatus; + } + + // Transition to RUNNING once the orchestration has been processed for the first time + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING) { + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + } + + // Process actions + for (const action of actions) { + this.processAction(instance, action); + } + + // Update completion token for next execution + instance.completionToken = this.nextCompletionToken++; + + // Notify waiters + this.notifyWaiters(instanceId); + } + + /** + * Completes an activity execution. + */ + completeActivity( + instanceId: string, + taskId: number, + result?: string, + error?: Error, + ): void { + const instance = this.instances.get(instanceId); + if (!instance) { + return; // Instance may have been purged + } + + let event: pb.HistoryEvent; + if (error) { + event = pbh.newTaskFailedEvent(taskId, error); + } else { + event = pbh.newTaskCompletedEvent(taskId, result); + } + + instance.pendingEvents.push(event); + instance.lastUpdatedAt = new Date(); + this.enqueueOrchestration(instanceId); + } + + /** + * Waits for an orchestration to reach a state matching the predicate. + */ + async waitForState( + instanceId: string, + predicate: (instance: OrchestrationInstance) => boolean, + timeoutMs: number = 30000, + ): Promise { + const instance = this.instances.get(instanceId); + if (instance && predicate(instance)) { + return instance; + } + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + const waiters = this.stateWaiters.get(instanceId); + if (waiters) { + const index = waiters.findIndex((w) => w.resolve === resolve); + if (index >= 0) { + waiters.splice(index, 1); + } + } + reject(new Error(`Timeout waiting for orchestration '${instanceId}'`)); + }, timeoutMs); + + const waiter: StateWaiter = { + resolve: (result) => { + clearTimeout(timer); + resolve(result); + }, + reject: (error) => { + clearTimeout(timer); + reject(error); + }, + predicate, + }; + + let waiters = this.stateWaiters.get(instanceId); + if (!waiters) { + waiters = []; + this.stateWaiters.set(instanceId, waiters); + } + waiters.push(waiter); + }); + } + + /** + * Checks if there are any pending work items. + */ + hasPendingWork(): boolean { + return this.orchestrationQueue.length > 0 || this.activityQueue.length > 0; + } + + /** + * Resets the backend, clearing all state. + */ + reset(): void { + this.instances.clear(); + this.orchestrationQueue.length = 0; + this.activityQueue.length = 0; + for (const waiters of this.stateWaiters.values()) { + for (const waiter of waiters) { + waiter.reject(new Error("Backend was reset")); + } + } + this.stateWaiters.clear(); + } + + /** + * Converts internal status to client status. + */ + toClientStatus(status: pb.OrchestrationStatus): ClientOrchestrationStatus { + switch (status) { + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING: + return ClientOrchestrationStatus.PENDING; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING: + return ClientOrchestrationStatus.RUNNING; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED: + return ClientOrchestrationStatus.COMPLETED; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED: + return ClientOrchestrationStatus.FAILED; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED: + return ClientOrchestrationStatus.TERMINATED; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED: + return ClientOrchestrationStatus.SUSPENDED; + case pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW: + // Continued-as-new is transient, should show as running + return ClientOrchestrationStatus.RUNNING; + default: + return ClientOrchestrationStatus.RUNNING; + } + } + + private enqueueOrchestration(instanceId: string): void { + if (!this.orchestrationQueueSet.has(instanceId)) { + this.orchestrationQueue.push(instanceId); + this.orchestrationQueueSet.add(instanceId); + } + } + + private isTerminalStatus(status: pb.OrchestrationStatus): boolean { + return ( + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED || + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED || + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED + ); + } + + private processAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { + const actionType = action.getOrchestratoractiontypeCase(); + + switch (actionType) { + case pb.OrchestratorAction.OrchestratoractiontypeCase.COMPLETEORCHESTRATION: + this.processCompleteOrchestrationAction(instance, action.getCompleteorchestration()!); + break; + case pb.OrchestratorAction.OrchestratoractiontypeCase.SCHEDULETASK: + this.processScheduleTaskAction(instance, action); + break; + case pb.OrchestratorAction.OrchestratoractiontypeCase.CREATETIMER: + this.processCreateTimerAction(instance, action); + break; + case pb.OrchestratorAction.OrchestratoractiontypeCase.CREATESUBORCHESTRATION: + this.processCreateSubOrchestrationAction(instance, action); + break; + case pb.OrchestratorAction.OrchestratoractiontypeCase.SENDEVENT: + this.processSendEventAction(action.getSendevent()!); + break; + default: + console.warn(`Unknown action type: ${actionType}`); + } + } + + private processCompleteOrchestrationAction( + instance: OrchestrationInstance, + completeAction: pb.CompleteOrchestrationAction, + ): void { + const status = completeAction.getOrchestrationstatus(); + instance.status = status; + instance.output = completeAction.getResult()?.getValue(); + instance.failureDetails = completeAction.getFailuredetails(); + + if (status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW) { + // Handle continue-as-new + const newInput = completeAction.getResult()?.getValue(); + const carryoverEvents = completeAction.getCarryovereventsList(); + + // Reset instance state + instance.history = []; + instance.input = newInput; + instance.output = undefined; + instance.failureDetails = undefined; + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING; + + // Add carryover events + instance.pendingEvents = [...carryoverEvents]; + + // Add new execution started events + const orchestratorStarted = pbh.newOrchestratorStartedEvent(new Date()); + const executionStarted = pbh.newExecutionStartedEvent(instance.name, instance.instanceId, newInput); + instance.pendingEvents.push(orchestratorStarted); + instance.pendingEvents.push(executionStarted); + + this.enqueueOrchestration(instance.instanceId); + } + } + + private processScheduleTaskAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { + const scheduleTask = action.getScheduletask()!; + const taskId = action.getId(); + const taskName = scheduleTask.getName(); + const input = scheduleTask.getInput()?.getValue(); + + // Add TaskScheduled event to history + const event = pbh.newTaskScheduledEvent(taskId, taskName, input); + instance.history.push(event); + + // Mark instance as running + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING) { + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + } + + // Queue activity for execution + this.activityQueue.push({ + instanceId: instance.instanceId, + name: taskName, + taskId, + input, + completionToken: instance.completionToken, + }); + } + + private processCreateTimerAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { + const createTimer = action.getCreatetimer()!; + const timerId = action.getId(); + const fireAt = createTimer.getFireat()?.toDate() ?? new Date(); + + // Add TimerCreated event to history + const timerCreatedEvent = pbh.newTimerCreatedEvent(timerId, fireAt); + instance.history.push(timerCreatedEvent); + + // Mark instance as running + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING) { + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + } + + // Schedule timer firing + const now = new Date(); + const delay = Math.max(0, fireAt.getTime() - now.getTime()); + + setTimeout(() => { + const currentInstance = this.instances.get(instance.instanceId); + if (currentInstance && !this.isTerminalStatus(currentInstance.status)) { + const timerFiredEvent = pbh.newTimerFiredEvent(timerId, fireAt); + currentInstance.pendingEvents.push(timerFiredEvent); + currentInstance.lastUpdatedAt = new Date(); + this.enqueueOrchestration(instance.instanceId); + } + }, delay); + } + + private processCreateSubOrchestrationAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { + const createSubOrch = action.getCreatesuborchestration()!; + const taskId = action.getId(); + const name = createSubOrch.getName(); + const subInstanceId = createSubOrch.getInstanceid(); + const input = createSubOrch.getInput()?.getValue(); + + // Add SubOrchestrationInstanceCreated event to history + const event = pbh.newSubOrchestrationCreatedEvent(taskId, name, subInstanceId, input); + instance.history.push(event); + + // Mark instance as running + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING) { + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + } + + // Create the sub-orchestration + try { + this.createInstance(subInstanceId, name, input); + + // Watch for sub-orchestration completion + this.watchSubOrchestration(instance.instanceId, subInstanceId, taskId); + } catch (error: any) { + // Sub-orchestration creation failed + const failedEvent = pbh.newSubOrchestrationFailedEvent(taskId, error); + instance.pendingEvents.push(failedEvent); + this.enqueueOrchestration(instance.instanceId); + } + } + + private watchSubOrchestration(parentInstanceId: string, subInstanceId: string, taskId: number): void { + // Use the stateWaiters mechanism instead of polling to avoid infinite loops + // and unnecessary resource consumption + this.waitForState( + subInstanceId, + (inst) => this.isTerminalStatus(inst.status), + // No timeout - sub-orchestration will eventually complete, fail, or be terminated + // If parent is terminated, we check that when delivering the event + ) + .then((subInstance) => { + const parentInstance = this.instances.get(parentInstanceId); + + // If parent or sub no longer exists, nothing to do + if (!subInstance || !parentInstance) { + return; + } + + // If parent already terminated, don't deliver the completion event + if (this.isTerminalStatus(parentInstance.status)) { + return; + } + + // Deliver the sub-orchestration completion/failure event to parent + let event: pb.HistoryEvent; + if (subInstance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED) { + event = pbh.newSubOrchestrationCompletedEvent(taskId, subInstance.output); + } else { + const error = new Error(subInstance.failureDetails?.getErrormessage() || "Sub-orchestration failed"); + event = pbh.newSubOrchestrationFailedEvent(taskId, error); + } + parentInstance.pendingEvents.push(event); + parentInstance.lastUpdatedAt = new Date(); + this.enqueueOrchestration(parentInstanceId); + }) + .catch(() => { + // Timeout or reset - sub-orchestration watcher cancelled, nothing to do + }); + } + + private processSendEventAction(sendEvent: pb.SendEventAction): void { + const targetInstanceId = sendEvent.getInstance()?.getInstanceid(); + const eventName = sendEvent.getName(); + const eventData = sendEvent.getData()?.getValue(); + + if (targetInstanceId) { + try { + this.raiseEvent(targetInstanceId, eventName, eventData); + } catch { + // Target instance may not exist - ignore + } + } + } + + private notifyWaiters(instanceId: string): void { + const instance = this.instances.get(instanceId); + const waiters = this.stateWaiters.get(instanceId); + + if (!waiters || waiters.length === 0 || !instance) { + return; + } + + // Find and notify matching waiters + const matchingWaiters = waiters.filter((w) => w.predicate(instance)); + for (const waiter of matchingWaiters) { + waiter.resolve(instance); + } + + // Remove notified waiters + const remainingWaiters = waiters.filter((w) => !matchingWaiters.includes(w)); + if (remainingWaiters.length === 0) { + this.stateWaiters.delete(instanceId); + } else { + this.stateWaiters.set(instanceId, remainingWaiters); + } + } +} diff --git a/packages/durabletask-js/src/testing/index.ts b/packages/durabletask-js/src/testing/index.ts new file mode 100644 index 0000000..283b591 --- /dev/null +++ b/packages/durabletask-js/src/testing/index.ts @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export { InMemoryOrchestrationBackend } from "./in-memory-backend"; +export { TestOrchestrationClient } from "./test-client"; +export { TestOrchestrationWorker } from "./test-worker"; diff --git a/packages/durabletask-js/src/testing/test-client.ts b/packages/durabletask-js/src/testing/test-client.ts new file mode 100644 index 0000000..f7af0a9 --- /dev/null +++ b/packages/durabletask-js/src/testing/test-client.ts @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { randomUUID } from "crypto"; +import { getName } from "../task"; +import { TOrchestrator } from "../types/orchestrator.type"; +import { TInput } from "../types/input.type"; +import { OrchestrationState } from "../orchestration/orchestration-state"; +import { FailureDetails } from "../task/failure-details"; +import { InMemoryOrchestrationBackend, OrchestrationInstance } from "./in-memory-backend"; +import * as pb from "../proto/orchestrator_service_pb"; + +/** + * Client for scheduling and managing orchestrations in the in-memory backend. + * + * This client provides a similar API to TaskHubGrpcClient but operates + * entirely in-memory for testing purposes. + */ +export class TestOrchestrationClient { + constructor(private readonly backend: InMemoryOrchestrationBackend) {} + + /** + * Schedules a new orchestration. + */ + async scheduleNewOrchestration( + orchestrator: TOrchestrator | string, + input?: TInput, + instanceId?: string, + startAt?: Date, + ): Promise { + const name = typeof orchestrator === "string" ? orchestrator : getName(orchestrator); + const id = instanceId ?? randomUUID(); + const encodedInput = input !== undefined ? JSON.stringify(input) : undefined; + + this.backend.createInstance(id, name, encodedInput, startAt); + return id; + } + + /** + * Gets the current state of an orchestration. + */ + async getOrchestrationState( + instanceId: string, + fetchPayloads: boolean = true, + ): Promise { + const instance = this.backend.getInstance(instanceId); + if (!instance) { + return undefined; + } + return this.toOrchestrationState(instance, fetchPayloads); + } + + /** + * Waits for an orchestration to start running. + */ + async waitForOrchestrationStart( + instanceId: string, + fetchPayloads: boolean = false, + timeout: number = 60, + ): Promise { + const instance = await this.backend.waitForState( + instanceId, + (inst) => inst.status !== pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING, + timeout * 1000, + ); + if (!instance) { + return undefined; + } + return this.toOrchestrationState(instance, fetchPayloads); + } + + /** + * Waits for an orchestration to complete. + */ + async waitForOrchestrationCompletion( + instanceId: string, + fetchPayloads: boolean = true, + timeout: number = 60, + ): Promise { + const instance = await this.backend.waitForState( + instanceId, + (inst) => this.isTerminalStatus(inst.status), + timeout * 1000, + ); + if (!instance) { + return undefined; + } + return this.toOrchestrationState(instance, fetchPayloads); + } + + /** + * Raises an event to an orchestration. + */ + async raiseOrchestrationEvent(instanceId: string, eventName: string, data: any = null): Promise { + const encodedData = data !== null ? JSON.stringify(data) : undefined; + this.backend.raiseEvent(instanceId, eventName, encodedData); + } + + /** + * Terminates an orchestration. + */ + async terminateOrchestration(instanceId: string, output: any = null): Promise { + const encodedOutput = output !== null ? JSON.stringify(output) : undefined; + this.backend.terminate(instanceId, encodedOutput); + } + + /** + * Suspends an orchestration. + */ + async suspendOrchestration(instanceId: string): Promise { + this.backend.suspend(instanceId); + } + + /** + * Resumes a suspended orchestration. + */ + async resumeOrchestration(instanceId: string): Promise { + this.backend.resume(instanceId); + } + + /** + * Purges a completed orchestration from storage. + */ + async purgeOrchestration(instanceId: string): Promise<{ deletedInstanceCount: number }> { + const deleted = this.backend.purge(instanceId); + return { deletedInstanceCount: deleted ? 1 : 0 }; + } + + /** + * Stops the client. No-op for in-memory backend. + */ + async stop(): Promise { + // No-op - in-memory client has nothing to stop + } + + private isTerminalStatus(status: pb.OrchestrationStatus): boolean { + return ( + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED || + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED || + status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED + ); + } + + private toOrchestrationState(instance: OrchestrationInstance, fetchPayloads: boolean): OrchestrationState { + let failureDetails: FailureDetails | undefined; + if (instance.failureDetails) { + failureDetails = new FailureDetails( + instance.failureDetails.getErrormessage(), + instance.failureDetails.getErrortype(), + instance.failureDetails.getStacktrace()?.getValue(), + ); + } + + return new OrchestrationState( + instance.instanceId, + instance.name, + this.backend.toClientStatus(instance.status), + instance.createdAt, + instance.lastUpdatedAt, + fetchPayloads ? instance.input : undefined, + fetchPayloads ? instance.output : undefined, + fetchPayloads ? instance.customStatus : undefined, + failureDetails, + ); + } +} diff --git a/packages/durabletask-js/src/testing/test-worker.ts b/packages/durabletask-js/src/testing/test-worker.ts new file mode 100644 index 0000000..3b85c57 --- /dev/null +++ b/packages/durabletask-js/src/testing/test-worker.ts @@ -0,0 +1,186 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { Registry } from "../worker/registry"; +import { OrchestrationExecutor } from "../worker/orchestration-executor"; +import { ActivityExecutor } from "../worker/activity-executor"; +import { TOrchestrator } from "../types/orchestrator.type"; +import { TActivity } from "../types/activity.type"; +import { TInput } from "../types/input.type"; +import { TOutput } from "../types/output.type"; +import { InMemoryOrchestrationBackend, OrchestrationInstance, ActivityWorkItem } from "./in-memory-backend"; +import * as pb from "../proto/orchestrator_service_pb"; +import * as pbh from "../utils/pb-helper.util"; + +/** + * Worker that processes orchestrations and activities from the in-memory backend. + * + * This worker runs in the same process as the test and processes work items + * synchronously in the Node.js event loop, avoiding the need for a separate + * sidecar process. + */ +export class TestOrchestrationWorker { + private readonly registry: Registry; + private readonly backend: InMemoryOrchestrationBackend; + private isRunning: boolean = false; + private processingPromise: Promise | null = null; + private stopRequested: boolean = false; + + constructor(backend: InMemoryOrchestrationBackend) { + this.registry = new Registry(); + this.backend = backend; + } + + /** + * Registers an orchestrator function with the worker. + */ + addOrchestrator(fn: TOrchestrator): string { + if (this.isRunning) { + throw new Error("Cannot add orchestrator while worker is running."); + } + return this.registry.addOrchestrator(fn); + } + + /** + * Registers a named orchestrator function with the worker. + */ + addNamedOrchestrator(name: string, fn: TOrchestrator): string { + if (this.isRunning) { + throw new Error("Cannot add orchestrator while worker is running."); + } + this.registry.addNamedOrchestrator(name, fn); + return name; + } + + /** + * Registers an activity function with the worker. + */ + addActivity(fn: TActivity): string { + if (this.isRunning) { + throw new Error("Cannot add activity while worker is running."); + } + return this.registry.addActivity(fn); + } + + /** + * Registers a named activity function with the worker. + */ + addNamedActivity(name: string, fn: TActivity): string { + if (this.isRunning) { + throw new Error("Cannot add activity while worker is running."); + } + this.registry.addNamedActivity(name, fn); + return name; + } + + /** + * Starts the worker processing loop. + */ + async start(): Promise { + if (this.isRunning) { + throw new Error("The worker is already running."); + } + + this.isRunning = true; + this.stopRequested = false; + this.processingPromise = this.runProcessingLoop(); + } + + /** + * Stops the worker. This method is idempotent and can be safely called + * even if the worker is not running. + */ + async stop(): Promise { + if (!this.isRunning) { + return; // Already stopped, nothing to do + } + + this.stopRequested = true; + this.isRunning = false; + + // Wait for the processing loop to finish + if (this.processingPromise) { + await this.processingPromise; + this.processingPromise = null; + } + } + + /** + * Main processing loop that continuously processes work items. + */ + private async runProcessingLoop(): Promise { + while (!this.stopRequested) { + let processedAny = false; + + // Process orchestrations first + const orchestration = this.backend.getNextOrchestrationWorkItem(); + if (orchestration) { + await this.processOrchestration(orchestration); + processedAny = true; + } + + // Then process activities + const activity = this.backend.getNextActivityWorkItem(); + if (activity) { + await this.processActivity(activity); + processedAny = true; + } + + // If nothing was processed, yield to allow other async operations + if (!processedAny) { + await this.yieldToEventLoop(); + } + } + } + + /** + * Processes a single orchestration work item. + */ + private async processOrchestration(instance: OrchestrationInstance): Promise { + const instanceId = instance.instanceId; + const completionToken = instance.completionToken; + + try { + const executor = new OrchestrationExecutor(this.registry); + const result = await executor.execute(instanceId, instance.history, instance.pendingEvents); + + this.backend.completeOrchestration(instanceId, completionToken, result.actions, result.customStatus); + } catch (error: any) { + console.error(`Error executing orchestration '${instanceId}':`, error); + + // Create a failure action + const failureDetails = pbh.newFailureDetails(error); + const failAction = pbh.newCompleteOrchestrationAction( + -1, + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + undefined, + failureDetails, + ); + + this.backend.completeOrchestration(instanceId, completionToken, [failAction]); + } + } + + /** + * Processes a single activity work item. + */ + private async processActivity(workItem: ActivityWorkItem): Promise { + const { instanceId, name, taskId, input } = workItem; + + try { + const executor = new ActivityExecutor(this.registry); + const result = await executor.execute(instanceId, name, taskId, input); + this.backend.completeActivity(instanceId, taskId, result); + } catch (error: any) { + console.error(`Error executing activity '${name}':`, error); + this.backend.completeActivity(instanceId, taskId, undefined, error); + } + } + + /** + * Yields control to the event loop to allow timers and I/O to process. + */ + private yieldToEventLoop(): Promise { + return new Promise((resolve) => setImmediate(resolve)); + } +} diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index c67b177..ad53ebd 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -113,7 +113,22 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { if (this._previousTask.isFailed) { // Raise the failure as an exception to the generator. The orchestrator can then either // handle the exception or allow it to fail the orchestration. - await this._generator.throw(this._previousTask._exception); + const throwResult = await this._generator.throw(this._previousTask._exception); + + // If the generator caught the exception and completed, signal completion + if (throwResult.done) { + throw new StopIterationError(throwResult.value); + } + + // If the generator yielded a new task after catching the exception + if (throwResult.value instanceof Task) { + this._previousTask = throwResult.value; + // If the new task is already complete, continue processing + if (this._previousTask.isComplete) { + await this.resume(); + } + return; + } } else if (this._previousTask.isComplete) { while (true) { // Resume the generator. This will either return a Task or raise StopIteration if it's done. diff --git a/packages/durabletask-js/test/in-memory-backend.spec.ts b/packages/durabletask-js/test/in-memory-backend.spec.ts new file mode 100644 index 0000000..56e5f15 --- /dev/null +++ b/packages/durabletask-js/test/in-memory-backend.spec.ts @@ -0,0 +1,289 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { + InMemoryOrchestrationBackend, + TestOrchestrationClient, + TestOrchestrationWorker, + OrchestrationStatus, + getName, + whenAll, + ActivityContext, + OrchestrationContext, + Task, + TOrchestrator, +} from "../src"; + +describe("In-Memory Backend", () => { + let backend: InMemoryOrchestrationBackend; + let client: TestOrchestrationClient; + let worker: TestOrchestrationWorker; + + beforeEach(async () => { + backend = new InMemoryOrchestrationBackend(); + client = new TestOrchestrationClient(backend); + worker = new TestOrchestrationWorker(backend); + }); + + afterEach(async () => { + if (worker) { + try { + await worker.stop(); + } catch { + // Ignore if not running + } + } + backend.reset(); + }); + + it("should run an empty orchestration", async () => { + let invoked = false; + + const emptyOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + invoked = true; + }; + + worker.addOrchestrator(emptyOrchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(emptyOrchestrator); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(invoked).toBe(true); + expect(state).toBeDefined(); + expect(state?.name).toEqual(getName(emptyOrchestrator)); + expect(state?.instanceId).toEqual(id); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + }); + + it("should run an activity sequence", async () => { + const plusOne = async (_: ActivityContext, input: number) => { + return input + 1; + }; + + const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any { + const numbers = [startVal]; + let current = startVal; + + for (let i = 0; i < 5; i++) { + current = yield ctx.callActivity(plusOne, current); + numbers.push(current); + } + + return numbers; + }; + + worker.addOrchestrator(sequence); + worker.addActivity(plusOne); + await worker.start(); + + const id = await client.scheduleNewOrchestration(sequence, 1); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.name).toEqual(getName(sequence)); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedInput).toEqual(JSON.stringify(1)); + expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6])); + }); + + it("should run fan-out/fan-in", async () => { + let activityCounter = 0; + + const increment = (_: ActivityContext) => { + activityCounter++; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any { + const tasks: Task[] = []; + + for (let i = 0; i < count; i++) { + tasks.push(ctx.callActivity(increment)); + } + + yield whenAll(tasks); + }; + + worker.addActivity(increment); + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator, 5); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(activityCounter).toEqual(5); + }); + + it("should handle sub-orchestrations", async () => { + let activityCounter = 0; + + const increment = (_: ActivityContext) => { + activityCounter++; + }; + + const childOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.callActivity(increment); + }; + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.callSubOrchestrator(childOrchestrator); + }; + + worker.addActivity(increment); + worker.addOrchestrator(childOrchestrator); + worker.addOrchestrator(parentOrchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(parentOrchestrator); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(activityCounter).toEqual(1); + }); + + it("should handle external events", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const value = yield ctx.waitForExternalEvent("my_event"); + return value; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + + // Wait for orchestration to start + await client.waitForOrchestrationStart(id, false, 5); + + // Raise the event + await client.raiseOrchestrationEvent(id, "my_event", "hello"); + + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify("hello")); + }); + + it("should handle timers", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Wait for 100ms + yield ctx.createTimer(0.1); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify("done")); + }); + + it("should handle termination", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("never"); + return "never reached"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 5); + + await client.terminateOrchestration(id, "terminated by test"); + + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED); + expect(state?.serializedOutput).toEqual(JSON.stringify("terminated by test")); + }); + + it("should handle continue-as-new", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext, input: number) => { + if (input < 5) { + ctx.continueAsNew(input + 1, true); + } else { + return input; + } + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator, 1); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify(5)); + }); + + it("should handle orchestration without activities", async () => { + const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => { + return input * 2; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator, 21); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + }); + + it("should handle activity failures", async () => { + const failingActivity = (_: ActivityContext) => { + throw new Error("Activity failed intentionally"); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + try { + yield ctx.callActivity(failingActivity); + return "should not reach here"; + } catch (error: any) { + return `caught: ${error.message}`; + } + }; + + worker.addActivity(failingActivity); + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toContain("caught:"); + }); + + it("should purge completed orchestrations", async () => { + const orchestrator: TOrchestrator = async () => "done"; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationCompletion(id, false, 10); + + const result = await client.purgeOrchestration(id); + expect(result.deletedInstanceCount).toEqual(1); + + const state = await client.getOrchestrationState(id); + expect(state).toBeUndefined(); + }); +}); diff --git a/tests/e2e/orchestration.spec.ts b/tests/e2e/orchestration.spec.ts index 56d9bcc..a94c2ad 100644 --- a/tests/e2e/orchestration.spec.ts +++ b/tests/e2e/orchestration.spec.ts @@ -2,11 +2,11 @@ // Licensed under the MIT License. import { - TaskHubGrpcClient, - TaskHubGrpcWorker, + InMemoryOrchestrationBackend, + TestOrchestrationClient, + TestOrchestrationWorker, PurgeInstanceCriteria, - ProtoOrchestrationStatus as OrchestrationStatus, - OrchestrationStatus as RuntimeStatus, + OrchestrationStatus, getName, whenAll, whenAny, @@ -17,18 +17,21 @@ import { } from "@microsoft/durabletask-js"; describe("Durable Functions", () => { - let taskHubClient: TaskHubGrpcClient; - let taskHubWorker: TaskHubGrpcWorker; + let backend: InMemoryOrchestrationBackend; + let taskHubClient: TestOrchestrationClient; + let taskHubWorker: TestOrchestrationWorker; beforeEach(async () => { - // Start a worker, which will connect to the sidecar in a background thread - taskHubWorker = new TaskHubGrpcWorker("localhost:4001"); - taskHubClient = new TaskHubGrpcClient("localhost:4001"); + // Create in-memory backend for testing without sidecar + backend = new InMemoryOrchestrationBackend(); + taskHubWorker = new TestOrchestrationWorker(backend); + taskHubClient = new TestOrchestrationClient(backend); }); afterEach(async () => { await taskHubWorker.stop(); await taskHubClient.stop(); + backend.reset(); }); it("should be able to run an empty orchestration", async () => { @@ -51,7 +54,7 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(emptyOrchestrator)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); }); it("should be able to run an activity sequence", async () => { @@ -82,7 +85,7 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(sequence)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(1)); expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])); }, 31000); @@ -114,7 +117,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 10); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.failureDetails).toBeUndefined(); expect(activityCounter).toEqual(10); }, 31000); @@ -144,7 +147,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.failureDetails).toBeUndefined(); expect(activityCounter).toEqual(1); }, 31000); @@ -186,7 +189,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.failureDetails).toBeUndefined(); expect(activityCounter).toEqual(SUB_ORCHESTRATION_COUNT * ACTIVITY_COUNT); }, 31000); @@ -210,7 +213,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedOutput).toEqual(JSON.stringify(["a", "b", "c"])); }); @@ -238,7 +241,7 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(singleTimer)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.createdAt).toBeDefined(); expect(state?.lastUpdatedAt).toBeDefined(); // Timer should fire after approximately the expected delay (with tolerance for timing variations) @@ -272,7 +275,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); if (shouldRaiseEvent) { expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); @@ -308,7 +311,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); if (shouldRaiseEvent) { expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); @@ -329,12 +332,12 @@ describe("Durable Functions", () => { const id = await taskHubClient.scheduleNewOrchestration(orchestrator); let state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING); await taskHubClient.terminateOrchestration(id, "some reason for termination"); state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED); expect(state?.serializedOutput).toEqual(JSON.stringify("some reason for termination")); }, 31000); @@ -354,7 +357,7 @@ describe("Durable Functions", () => { const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedOutput).toEqual(JSON.stringify(10)); }, 31000); @@ -373,7 +376,7 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(orchestrator)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(15)); expect(state?.serializedOutput).toEqual(JSON.stringify(16)); }, 31000); @@ -398,7 +401,7 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(orchestrator)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(1)); expect(state?.serializedOutput).toEqual(JSON.stringify(2)); @@ -407,8 +410,8 @@ describe("Durable Functions", () => { expect(purgeResult?.deletedInstanceCount).toEqual(1); }, 31000); - // Skip: multi-instance purge is not implemented in the durabletask-go sidecar emulator - // See: https://github.com/microsoft/durabletask-go/issues/XXX + // Skip: multi-instance purge is not implemented in the in-memory testing backend + // This test uses PurgeInstanceCriteria which requires gRPC sidecar it.skip("should be able to purge orchestration by PurgeInstanceCriteria", async () => { const delaySeconds = 1; const plusOne = async (_: ActivityContext, input: number) => { @@ -437,14 +440,15 @@ describe("Durable Functions", () => { expect(state?.name).toEqual(getName(orchestrator)); expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(1)); expect(state?.serializedOutput).toEqual(JSON.stringify(2)); // purge instance, test CreatedTimeFrom const criteria = new PurgeInstanceCriteria(); criteria.setCreatedTimeFrom(startTime); - let purgeResult = await taskHubClient.purgeOrchestration(criteria); + // Note: This uses gRPC client API, not available in in-memory backend + let purgeResult = await (taskHubClient as any).purgeOrchestration(criteria); expect(purgeResult); expect(purgeResult?.deletedInstanceCount).toEqual(1); @@ -454,7 +458,7 @@ describe("Durable Functions", () => { // purge instance, test CreatedTimeTo criteria.setCreatedTimeTo(new Date(Date.now())); - purgeResult = await taskHubClient.purgeOrchestration(criteria); + purgeResult = await (taskHubClient as any).purgeOrchestration(criteria); expect(purgeResult); expect(purgeResult?.deletedInstanceCount).toEqual(0); @@ -469,7 +473,7 @@ describe("Durable Functions", () => { expect(state1?.name).toEqual(getName(orchestrator)); expect(state1?.instanceId).toEqual(id1); expect(state1?.failureDetails).toBeUndefined(); - expect(state1?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state1?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state1?.serializedInput).toEqual(JSON.stringify(1)); expect(state1?.serializedOutput).toEqual(JSON.stringify(2)); @@ -480,18 +484,18 @@ describe("Durable Functions", () => { expect(state2?.name).toEqual(getName(terminate)); expect(state2?.instanceId).toEqual(id2); expect(state2?.failureDetails).toBeUndefined(); - expect(state2?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(state2?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED); - const runtimeStatuses: RuntimeStatus[] = []; - runtimeStatuses.push(RuntimeStatus.TERMINATED); - runtimeStatuses.push(RuntimeStatus.COMPLETED); + const runtimeStatuses: OrchestrationStatus[] = []; + runtimeStatuses.push(OrchestrationStatus.TERMINATED); + runtimeStatuses.push(OrchestrationStatus.COMPLETED); // Add a small delay to ensure the orchestrations are fully persisted await new Promise((resolve) => setTimeout(resolve, 1000)); criteria.setCreatedTimeTo(new Date(Date.now())); criteria.setRuntimeStatusList(runtimeStatuses); - purgeResult = await taskHubClient.purgeOrchestration(criteria); + purgeResult = await (taskHubClient as any).purgeOrchestration(criteria); expect(purgeResult); expect(purgeResult?.deletedInstanceCount).toEqual(2);