diff --git a/packages/durabletask-js-azuremanaged/src/index.ts b/packages/durabletask-js-azuremanaged/src/index.ts index 0f2f75f..0171ff7 100644 --- a/packages/durabletask-js-azuremanaged/src/index.ts +++ b/packages/durabletask-js-azuremanaged/src/index.ts @@ -21,5 +21,8 @@ export { DurableTaskAzureManagedWorkerBuilder, createAzureManagedWorkerBuilder } // Logger exports - re-export from core package for convenience export { Logger, ConsoleLogger, NoOpLogger } from "@microsoft/durabletask-js"; +// Versioning exports - re-export from core package for convenience +export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "@microsoft/durabletask-js"; + // Azure-specific logger adapter export { AzureLoggerAdapter, createAzureLogger } from "./azure-logger-adapter"; diff --git a/packages/durabletask-js-azuremanaged/src/worker-builder.ts b/packages/durabletask-js-azuremanaged/src/worker-builder.ts index 310b1cf..1bc0285 100644 --- a/packages/durabletask-js-azuremanaged/src/worker-builder.ts +++ b/packages/durabletask-js-azuremanaged/src/worker-builder.ts @@ -4,7 +4,16 @@ import { TokenCredential } from "@azure/identity"; import * as grpc from "@grpc/grpc-js"; import { DurableTaskAzureManagedWorkerOptions } from "./options"; -import { TaskHubGrpcWorker, TOrchestrator, TActivity, TInput, TOutput, Logger, ConsoleLogger } from "@microsoft/durabletask-js"; +import { + TaskHubGrpcWorker, + TOrchestrator, + TActivity, + TInput, + TOutput, + Logger, + ConsoleLogger, + VersioningOptions, +} from "@microsoft/durabletask-js"; /** * Builder for creating DurableTaskWorker instances that connect to Azure-managed Durable Task service. @@ -17,6 +26,7 @@ export class DurableTaskAzureManagedWorkerBuilder { private _activities: { name?: string; fn: TActivity }[] = []; private _logger: Logger = new ConsoleLogger(); private _shutdownTimeoutMs?: number; + private _versioning?: VersioningOptions; /** * Creates a new instance of DurableTaskAzureManagedWorkerBuilder. @@ -198,6 +208,18 @@ export class DurableTaskAzureManagedWorkerBuilder { return this; } + /** + * Configures versioning options for the worker. + * This allows filtering orchestrations by version using different match strategies. + * + * @param options The versioning options including version, matchStrategy, and failureStrategy. + * @returns This builder instance. + */ + versioning(options: VersioningOptions): DurableTaskAzureManagedWorkerBuilder { + this._versioning = options; + return this; + } + /** * Builds and returns a configured TaskHubGrpcWorker. * @@ -219,18 +241,17 @@ export class DurableTaskAzureManagedWorkerBuilder { ...this._grpcChannelOptions, }; - // Use the core TaskHubGrpcWorker with custom credentials and metadata generator - // For insecure connections, metadata is passed via the metadataGenerator parameter - // For secure connections, metadata is included in the channel credentials - const worker = new TaskHubGrpcWorker( + // Use the core TaskHubGrpcWorker with options-based constructor + const worker = new TaskHubGrpcWorker({ hostAddress, - combinedOptions, - true, - channelCredentials, + options: combinedOptions, + useTLS: true, + credentials: channelCredentials, metadataGenerator, - this._logger, - this._shutdownTimeoutMs, - ); + logger: this._logger, + shutdownTimeoutMs: this._shutdownTimeoutMs, + versioning: this._versioning, + }); // Register all orchestrators for (const { name, fn } of this._orchestrators) { diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index 884b2c8..bdb4fad 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -17,6 +17,8 @@ import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/ import { TimeoutError } from "../exception/timeout-error"; import { PurgeResult } from "../orchestration/orchestration-purge-result"; import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria"; +import { PurgeInstanceOptions } from "../orchestration/orchestration-purge-options"; +import { TerminateInstanceOptions, isTerminateInstanceOptions } from "../orchestration/orchestration-terminate-options"; import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util"; import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query"; import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page"; @@ -47,12 +49,19 @@ export interface TaskHubGrpcClientOptions { metadataGenerator?: MetadataGenerator; /** Optional logger instance. Defaults to ConsoleLogger. */ logger?: Logger; + /** + * The default version to use when starting new orchestrations without an explicit version. + * If specified, this will be used as the version for orchestrations that don't provide + * their own version in StartOrchestrationOptions. + */ + defaultVersion?: string; } export class TaskHubGrpcClient { private _stub: stubs.TaskHubSidecarServiceClient; private _metadataGenerator?: MetadataGenerator; private _logger: Logger; + private _defaultVersion?: string; /** * Creates a new TaskHubGrpcClient instance. @@ -95,6 +104,7 @@ export class TaskHubGrpcClient { let resolvedCredentials: grpc.ChannelCredentials | undefined; let resolvedMetadataGenerator: MetadataGenerator | undefined; let resolvedLogger: Logger | undefined; + let resolvedDefaultVersion: string | undefined; if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) { // Options object constructor @@ -104,6 +114,7 @@ export class TaskHubGrpcClient { resolvedCredentials = hostAddressOrOptions.credentials; resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator; resolvedLogger = hostAddressOrOptions.logger; + resolvedDefaultVersion = hostAddressOrOptions.defaultVersion; } else { // Deprecated positional parameters constructor resolvedHostAddress = hostAddressOrOptions; @@ -117,6 +128,7 @@ export class TaskHubGrpcClient { this._stub = new GrpcClient(resolvedHostAddress, resolvedOptions, resolvedUseTLS, resolvedCredentials).stub; this._metadataGenerator = resolvedMetadataGenerator; this._logger = resolvedLogger ?? new ConsoleLogger(); + this._defaultVersion = resolvedDefaultVersion; } async stop(): Promise { @@ -177,6 +189,13 @@ export class TaskHubGrpcClient { typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined ? undefined : instanceIdOrOptions.tags; + const version = + typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined + ? undefined + : instanceIdOrOptions.version; + + // Use provided version, or fall back to client's default version + const effectiveVersion = version ?? this._defaultVersion; const req = new pb.CreateInstanceRequest(); req.setName(name); @@ -191,9 +210,15 @@ export class TaskHubGrpcClient { req.setInput(i); req.setScheduledstarttimestamp(ts); + if (effectiveVersion) { + const v = new StringValue(); + v.setValue(effectiveVersion); + req.setVersion(v); + } + populateTagsMap(req.getTagsMap(), tags); - this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`); + this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${effectiveVersion ? ` (version: ${effectiveVersion})` : ""}`); const res = await callWithMetadata( this._stub.startInstance.bind(this._stub), @@ -364,18 +389,49 @@ export class TaskHubGrpcClient { * Terminates the orchestrator associated with the provided instance id. * * @param {string} instanceId - orchestrator instance id to terminate. - * @param {any} output - The optional output to set for the terminated orchestrator instance. + * @param {any | TerminateInstanceOptions} outputOrOptions - The optional output to set for the terminated orchestrator instance, + * or a TerminateInstanceOptions object created with `terminateOptions()` that can include both + * output and recursive termination settings. + * + * @example + * ```typescript + * // Simple termination with output + * await client.terminateOrchestration(instanceId, { reason: "cancelled" }); + * + * // Recursive termination with options (use terminateOptions helper) + * import { terminateOptions } from "@microsoft/durabletask-js"; + * await client.terminateOrchestration(instanceId, terminateOptions({ + * output: { reason: "cancelled" }, + * recursive: true + * })); + * ``` */ - async terminateOrchestration(instanceId: string, output: any = null): Promise { + async terminateOrchestration( + instanceId: string, + outputOrOptions: any | TerminateInstanceOptions = null, + ): Promise { const req = new pb.TerminateRequest(); req.setInstanceid(instanceId); + let output: any = null; + let recursive = false; + + // Use type guard to safely detect TerminateInstanceOptions + // This avoids false positives when user output happens to have 'recursive' or 'output' properties + if (isTerminateInstanceOptions(outputOrOptions)) { + output = outputOrOptions.output ?? null; + recursive = outputOrOptions.recursive ?? false; + } else { + output = outputOrOptions; + } + const i = new StringValue(); i.setValue(JSON.stringify(output)); req.setOutput(i); + req.setRecursive(recursive); - this._logger.info(`Terminating '${instanceId}'`); + this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`); await callWithMetadata( this._stub.terminateInstance.bind(this._stub), @@ -537,16 +593,21 @@ export class TaskHubGrpcClient { * * @param value - The unique ID of the orchestration instance to purge or orchestration instance filter criteria used * to determine which instances to purge. + * @param options - Optional options to control the purge behavior, such as recursive purging of sub-orchestrations. * @returns A Promise that resolves to a {@link PurgeResult} or `undefined` if the purge operation was not successful. */ - async purgeOrchestration(value: string | PurgeInstanceCriteria): Promise { + async purgeOrchestration( + value: string | PurgeInstanceCriteria, + options?: PurgeInstanceOptions, + ): Promise { let res; - if (typeof value === `string`) { + if (typeof value === "string") { const instanceId = value; const req = new pb.PurgeInstancesRequest(); req.setInstanceid(instanceId); + req.setRecursive(options?.recursive ?? false); - this._logger.info(`Purging Instance '${instanceId}'`); + this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`); res = await callWithMetadata( this._stub.purgeInstances.bind(this._stub), @@ -574,9 +635,10 @@ export class TaskHubGrpcClient { filter.addRuntimestatus(toProtobuf(status)); } req.setPurgeinstancefilter(filter); + req.setRecursive(options?.recursive ?? false); const timeout = purgeInstanceCriteria.getTimeout(); - this._logger.info("Purging Instance using purging criteria"); + this._logger.info(`Purging Instances using purging criteria${options?.recursive ? " (recursive)" : ""}`); const callPromise = callWithMetadata( this._stub.purgeInstances.bind(this._stub), diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index 8381d30..dd093f2 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -4,6 +4,7 @@ // Client and Worker export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client"; export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker"; +export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options"; // Contexts export { OrchestrationContext } from "./task/context/orchestration-context"; @@ -11,6 +12,8 @@ export { ActivityContext } from "./task/context/activity-context"; // Orchestration types and utilities export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria"; +export { PurgeInstanceOptions } from "./orchestration/orchestration-purge-options"; +export { TerminateInstanceOptions, terminateOptions, isTerminateInstanceOptions, TERMINATE_OPTIONS_SYMBOL } from "./orchestration/orchestration-terminate-options"; export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum"; export { OrchestrationState } from "./orchestration/orchestration-state"; @@ -82,3 +85,7 @@ export { ParentOrchestrationInstance } from "./types/parent-orchestration-instan // Logger export { Logger, ConsoleLogger, NoOpLogger } from "./types/logger.type"; +export { ReplaySafeLogger, ReplayContext } from "./types/replay-safe-logger"; + +// Versioning utilities +export { compareVersions } from "./utils/versioning.util"; diff --git a/packages/durabletask-js/src/orchestration/orchestration-purge-options.ts b/packages/durabletask-js/src/orchestration/orchestration-purge-options.ts new file mode 100644 index 0000000..0ff33e7 --- /dev/null +++ b/packages/durabletask-js/src/orchestration/orchestration-purge-options.ts @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Options for purging orchestration instances. + */ +export interface PurgeInstanceOptions { + /** + * Whether to recursively purge sub-orchestrations as well. + * When true, all child orchestrations spawned by the target orchestration + * will also be purged. + * + * Note: Recursive purging may not be supported by all backend implementations. + * + * @default false + */ + recursive?: boolean; +} diff --git a/packages/durabletask-js/src/orchestration/orchestration-terminate-options.ts b/packages/durabletask-js/src/orchestration/orchestration-terminate-options.ts new file mode 100644 index 0000000..5081ca4 --- /dev/null +++ b/packages/durabletask-js/src/orchestration/orchestration-terminate-options.ts @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Symbol used to identify TerminateInstanceOptions objects. + * This prevents confusion when user output happens to have 'recursive' or 'output' properties. + */ +export const TERMINATE_OPTIONS_SYMBOL = Symbol.for("durabletask.TerminateInstanceOptions"); + +/** + * Options for terminating orchestration instances. + */ +export interface TerminateInstanceOptions { + /** + * Internal marker to identify this as a TerminateInstanceOptions object. + * @internal + */ + readonly [TERMINATE_OPTIONS_SYMBOL]?: true; + + /** + * Whether to recursively terminate sub-orchestrations as well. + * When true, all child orchestrations spawned by the target orchestration + * will also be terminated. + * + * @default false + */ + recursive?: boolean; + + /** + * The optional output to set for the terminated orchestrator instance. + */ + output?: any; +} + +/** + * Creates a TerminateInstanceOptions object with proper type identification. + * Use this function instead of creating plain objects to ensure correct behavior. + * + * @param options - The terminate options + * @returns A properly typed TerminateInstanceOptions object + * + * @example + * ```typescript + * // Terminate with recursive option + * await client.terminateOrchestration(instanceId, terminateOptions({ recursive: true })); + * + * // Terminate with output and recursive + * await client.terminateOrchestration(instanceId, terminateOptions({ + * output: { reason: "cancelled by user" }, + * recursive: true + * })); + * ``` + */ +export function terminateOptions(options: Omit): TerminateInstanceOptions { + return { + ...options, + [TERMINATE_OPTIONS_SYMBOL]: true as const, + }; +} + +/** + * Type guard to check if a value is a TerminateInstanceOptions object. + * @internal + */ +export function isTerminateInstanceOptions(value: unknown): value is TerminateInstanceOptions { + return ( + value !== null && + typeof value === "object" && + TERMINATE_OPTIONS_SYMBOL in value && + (value as TerminateInstanceOptions)[TERMINATE_OPTIONS_SYMBOL] === true + ); +} diff --git a/packages/durabletask-js/src/task/context/orchestration-context.ts b/packages/durabletask-js/src/task/context/orchestration-context.ts index ede6ffe..73641cd 100644 --- a/packages/durabletask-js/src/task/context/orchestration-context.ts +++ b/packages/durabletask-js/src/task/context/orchestration-context.ts @@ -4,8 +4,11 @@ import { ParentOrchestrationInstance } from "../../types/parent-orchestration-instance.type"; import { TActivity } from "../../types/activity.type"; import { TOrchestrator } from "../../types/orchestrator.type"; +import { Logger } from "../../types/logger.type"; +import { ReplaySafeLogger } from "../../types/replay-safe-logger"; import { TaskOptions, SubOrchestrationOptions } from "../options"; import { Task } from "../task"; +import { compareVersions } from "../../utils/versioning.util"; export abstract class OrchestrationContext { /** @@ -49,6 +52,46 @@ export abstract class OrchestrationContext { */ abstract get isReplaying(): boolean; + /** + * Gets the version of the current orchestration instance. + * + * The version is set when the orchestration instance is created via the client's + * scheduleNewOrchestration method using StartOrchestrationOptions.version. + * If no version was specified, this returns an empty string. + * + * @returns {string} The version of the current orchestration instance. + */ + abstract get version(): string; + + /** + * Compares the current orchestration version to the specified version. + * + * This method uses semantic versioning comparison when both versions are valid + * semantic versions, and falls back to lexicographic comparison otherwise. + * + * @remarks + * - If both versions are empty, this returns 0 (equal). + * - An empty context version is considered less than a defined version. + * - An empty parameter version is considered less than a defined context version. + * + * @param {string} version The version to compare against. + * @returns {number} A negative number if context version < parameter version, + * zero if equal, positive if context version > parameter version. + * + * @example + * ```typescript + * const orchestrator: TOrchestrator = async function* (ctx, input) { + * if (ctx.compareVersionTo("2.0.0") >= 0) { + * // This orchestration is version 2.0.0 or newer + * yield ctx.callActivity(newFeature, input); + * } + * }; + * ``` + */ + compareVersionTo(version: string): number { + return compareVersions(this.version, version); + } + /** * Create a timer task that will fire at a specified time. * @@ -139,4 +182,28 @@ export abstract class OrchestrationContext { * @returns {string} A new deterministic UUID string. */ abstract newGuid(): string; + + /** + * Creates a replay-safe logger that only writes logs when the orchestrator is not replaying. + * + * During orchestration replay, history events are re-processed to rebuild state. + * This can cause duplicate log entries if not handled properly. The returned logger + * wraps the provided logger and automatically suppresses log output during replay, + * ensuring that logs are only written once when the orchestration is making forward progress. + * + * @param {Logger} logger The underlying logger to wrap. + * @returns {Logger} A replay-safe logger instance. + * + * @example + * ```typescript + * const orchestrator: TOrchestrator = async function* (ctx, input) { + * const logger = ctx.createReplaySafeLogger(myLogger); + * logger.info("This will only be logged once, not during replay"); + * yield ctx.callActivity(myActivity, input); + * }; + * ``` + */ + createReplaySafeLogger(logger: Logger): Logger { + return new ReplaySafeLogger(this, logger); + } } diff --git a/packages/durabletask-js/src/task/options/task-options.ts b/packages/durabletask-js/src/task/options/task-options.ts index 0a66dd5..5cb11ed 100644 --- a/packages/durabletask-js/src/task/options/task-options.ts +++ b/packages/durabletask-js/src/task/options/task-options.ts @@ -16,6 +16,11 @@ export interface TaskOptions { * The tags to associate with the task. */ tags?: Record; + /** + * The version of the task (activity) to execute. + * When specified, only workers that handle this version will process the task. + */ + version?: string; } /** @@ -48,6 +53,12 @@ export interface StartOrchestrationOptions { * The tags to associate with the orchestration instance. */ tags?: Record; + /** + * The version of the orchestration to execute. + * This version is stored with the orchestration instance and can be accessed + * via the OrchestrationContext.version property. + */ + version?: string; } /** diff --git a/packages/durabletask-js/src/types/replay-safe-logger.ts b/packages/durabletask-js/src/types/replay-safe-logger.ts new file mode 100644 index 0000000..af3b31d --- /dev/null +++ b/packages/durabletask-js/src/types/replay-safe-logger.ts @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { Logger } from "./logger.type"; + +/** + * Interface representing a context that can determine if replay is occurring. + * This is used by ReplaySafeLogger to check if logging should be suppressed. + */ +export interface ReplayContext { + /** + * Whether the orchestrator is currently replaying from history. + */ + readonly isReplaying: boolean; +} + +/** + * A logger wrapper that only logs when the orchestration is not replaying. + * + * During orchestration replay, history events are re-processed to rebuild state. + * This can cause duplicate log entries if not handled properly. The ReplaySafeLogger + * wraps an existing logger and automatically suppresses log output during replay, + * ensuring that logs are only written once when the orchestration is making forward progress. + * + * @example + * ```typescript + * // Inside an orchestrator function: + * const logger = ctx.createReplaySafeLogger(myLogger); + * logger.info("This will only be logged once, not during replay"); + * ``` + */ +export class ReplaySafeLogger implements Logger { + private readonly context: ReplayContext; + private readonly innerLogger: Logger; + + /** + * Creates a new ReplaySafeLogger. + * + * @param context - The replay context used to determine if replay is occurring. + * @param logger - The underlying logger to delegate to when not replaying. + */ + constructor(context: ReplayContext, logger: Logger) { + if (!context) { + throw new Error("context is required"); + } + if (!logger) { + throw new Error("logger is required"); + } + this.context = context; + this.innerLogger = logger; + } + + /** + * Logs an error message if not replaying. + * @param message - The error message to log. + * @param args - Additional arguments to include in the log. + */ + error(message: string, ...args: unknown[]): void { + if (!this.context.isReplaying) { + this.innerLogger.error(message, ...args); + } + } + + /** + * Logs a warning message if not replaying. + * @param message - The warning message to log. + * @param args - Additional arguments to include in the log. + */ + warn(message: string, ...args: unknown[]): void { + if (!this.context.isReplaying) { + this.innerLogger.warn(message, ...args); + } + } + + /** + * Logs an informational message if not replaying. + * @param message - The informational message to log. + * @param args - Additional arguments to include in the log. + */ + info(message: string, ...args: unknown[]): void { + if (!this.context.isReplaying) { + this.innerLogger.info(message, ...args); + } + } + + /** + * Logs a debug message if not replaying. + * @param message - The debug message to log. + * @param args - Additional arguments to include in the log. + */ + debug(message: string, ...args: unknown[]): void { + if (!this.context.isReplaying) { + this.innerLogger.debug(message, ...args); + } + } +} diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index 8c91930..731d04f 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -194,6 +194,22 @@ export function newFailureDetails(e: any): pb.TaskFailureDetails { return failure; } +/** + * Creates a TaskFailureDetails for version mismatch errors. + * These errors are non-retriable as the version mismatch is deterministic. + * + * @param errorType The type of version error (e.g., "VersionMismatch", "VersionError") + * @param errorMessage The error message describing the version mismatch + * @returns A TaskFailureDetails with IsNonRetriable set to true + */ +export function newVersionMismatchFailureDetails(errorType: string, errorMessage: string): pb.TaskFailureDetails { + const failure = new pb.TaskFailureDetails(); + failure.setErrortype(errorType); + failure.setErrormessage(errorMessage); + failure.setIsnonretriable(true); + return failure; +} + export function newEventRaisedEvent(name: string, encodedInput?: string): pb.HistoryEvent { const ts = new Timestamp(); @@ -319,11 +335,15 @@ export function newScheduleTaskAction( name: string, encodedInput?: string, tags?: Record, + version?: string, ): pb.OrchestratorAction { const scheduleTaskAction = new pb.ScheduleTaskAction(); scheduleTaskAction.setName(name); scheduleTaskAction.setInput(getStringValue(encodedInput)); populateTagsMap(scheduleTaskAction.getTagsMap(), tags); + if (version) { + scheduleTaskAction.setVersion(getStringValue(version)); + } const action = new pb.OrchestratorAction(); action.setId(id); @@ -344,12 +364,16 @@ export function newCreateSubOrchestrationAction( instanceId?: string | null, encodedInput?: string, tags?: Record, + version?: string, ): pb.OrchestratorAction { const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction(); createSubOrchestrationAction.setName(name); createSubOrchestrationAction.setInstanceid(instanceId || ""); createSubOrchestrationAction.setInput(getStringValue(encodedInput)); populateTagsMap(createSubOrchestrationAction.getTagsMap(), tags); + if (version) { + createSubOrchestrationAction.setVersion(getStringValue(version)); + } const action = new pb.OrchestratorAction(); action.setId(id); diff --git a/packages/durabletask-js/src/utils/versioning.util.ts b/packages/durabletask-js/src/utils/versioning.util.ts new file mode 100644 index 0000000..2e94230 --- /dev/null +++ b/packages/durabletask-js/src/utils/versioning.util.ts @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Compares two version strings for ordering. + * + * This function supports semantic versioning comparison (e.g., "1.0.0" vs "2.1.0") + * and falls back to lexicographic comparison for non-semver strings. + * + * Rules: + * - Empty/undefined versions are considered equal to each other + * - An empty version is considered less than a non-empty version + * - Semantic versions are compared numerically by component (major.minor.patch) + * - Non-semantic versions are compared lexicographically (case-insensitive) + * + * @param sourceVersion - The first version to compare. + * @param otherVersion - The second version to compare. + * @returns A negative number if sourceVersion < otherVersion, + * zero if sourceVersion === otherVersion, + * a positive number if sourceVersion > otherVersion. + * + * @example + * ```typescript + * compareVersions("1.0.0", "2.0.0"); // Returns negative number + * compareVersions("2.0.0", "1.0.0"); // Returns positive number + * compareVersions("1.0.0", "1.0.0"); // Returns 0 + * compareVersions("", "1.0.0"); // Returns negative number + * ``` + */ +export function compareVersions(sourceVersion: string | undefined, otherVersion: string | undefined): number { + const sourceEmpty = !sourceVersion || sourceVersion.trim() === ""; + const otherEmpty = !otherVersion || otherVersion.trim() === ""; + + // Both empty = equal + if (sourceEmpty && otherEmpty) { + return 0; + } + + // Empty source < defined other + if (sourceEmpty) { + return -1; + } + + // Defined source > empty other + if (otherEmpty) { + return 1; + } + + // Try semantic version parsing + const sourceComponents = parseSemanticVersion(sourceVersion!); + const otherComponents = parseSemanticVersion(otherVersion!); + + if (sourceComponents && otherComponents) { + // Compare major, minor, patch, revision in order + for (let i = 0; i < Math.max(sourceComponents.length, otherComponents.length); i++) { + const sourceVal = sourceComponents[i] ?? 0; + const otherVal = otherComponents[i] ?? 0; + if (sourceVal !== otherVal) { + return sourceVal - otherVal; + } + } + return 0; + } + + // Fallback to lexicographic comparison (case-insensitive) + return sourceVersion!.toLowerCase().localeCompare(otherVersion!.toLowerCase()); +} + +/** + * Attempts to parse a version string as a semantic version. + * + * @param version - The version string to parse. + * @returns An array of numeric version components, or undefined if parsing fails. + */ +function parseSemanticVersion(version: string): number[] | undefined { + // Match patterns like "1", "1.0", "1.0.0", "1.0.0.0" + const match = version.match(/^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?$/); + if (!match) { + return undefined; + } + + const components: number[] = []; + for (let i = 1; i <= 4; i++) { + if (match[i] !== undefined) { + components.push(parseInt(match[i], 10)); + } + } + return components; +} diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 182d879..c90d660 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -129,6 +129,9 @@ export class OrchestrationExecutor { throw new OrchestratorNotRegisteredError(executionStartedEvent?.getName()); } + // Set the version from the execution started event + ctx._version = executionStartedEvent?.getVersion()?.getValue() ?? ""; + // Extract parent instance info if this is a sub-orchestration const parentInstance = executionStartedEvent?.getParentinstance(); if (parentInstance) { diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index c67b177..548ae8a 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -29,6 +29,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { _newGuidCounter: number; _currentUtcDatetime: Date; _instanceId: string; + _version: string; _parent?: ParentOrchestrationInstance; _completionStatus?: pb.OrchestrationStatus; _receivedEvents: Record; @@ -50,6 +51,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._newGuidCounter = 0; this._currentUtcDatetime = new Date(1000, 0, 1); this._instanceId = instanceId; + this._version = ""; this._parent = undefined; this._completionStatus = undefined; this._receivedEvents = {}; @@ -75,6 +77,10 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { return this._isReplaying; } + get version(): string { + return this._version; + } + /** * This is the main entry point for the orchestrator. It will run the generator * and return the first task to be executed. It is typically executed from the @@ -270,7 +276,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const id = this.nextSequenceNumber(); const name = typeof activity === "string" ? activity : getName(activity); const encodedInput = input ? JSON.stringify(input) : undefined; - const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags); + const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags, options?.version); this._pendingActions[action.getId()] = action; // If a retry policy is provided, create a RetryableTask @@ -314,7 +320,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } const encodedInput = input ? JSON.stringify(input) : undefined; - const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags); + const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags, options?.version); this._pendingActions[action.getId()] = action; // If a retry policy is provided, create a RetryableTask diff --git a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts index d28cbe1..2a35123 100644 --- a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts +++ b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts @@ -18,6 +18,8 @@ import { ActivityExecutor } from "./activity-executor"; import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; import { Logger, ConsoleLogger } from "../types/logger.type"; import { ExponentialBackoff, sleep, withTimeout } from "../utils/backoff.util"; +import { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./versioning-options"; +import { compareVersions } from "../utils/versioning.util"; /** Default timeout in milliseconds for graceful shutdown. */ const DEFAULT_SHUTDOWN_TIMEOUT_MS = 30000; @@ -40,6 +42,8 @@ export interface TaskHubGrpcWorkerOptions { logger?: Logger; /** Optional timeout in milliseconds for graceful shutdown. Defaults to 30000. */ shutdownTimeoutMs?: number; + /** Optional versioning options for filtering orchestrations by version. */ + versioning?: VersioningOptions; } export class TaskHubGrpcWorker { @@ -57,6 +61,7 @@ export class TaskHubGrpcWorker { private _pendingWorkItems: Set>; private _shutdownTimeoutMs: number; private _backoff: ExponentialBackoff; + private _versioning?: VersioningOptions; /** * Creates a new TaskHubGrpcWorker instance. @@ -103,6 +108,7 @@ export class TaskHubGrpcWorker { let resolvedMetadataGenerator: MetadataGenerator | undefined; let resolvedLogger: Logger | undefined; let resolvedShutdownTimeoutMs: number | undefined; + let resolvedVersioning: VersioningOptions | undefined; if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) { // Options object constructor @@ -113,6 +119,7 @@ export class TaskHubGrpcWorker { resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator; resolvedLogger = hostAddressOrOptions.logger; resolvedShutdownTimeoutMs = hostAddressOrOptions.shutdownTimeoutMs; + resolvedVersioning = hostAddressOrOptions.versioning; } else { // Deprecated positional parameters constructor resolvedHostAddress = hostAddressOrOptions; @@ -142,6 +149,7 @@ export class TaskHubGrpcWorker { maxDelayMs: 30000, multiplier: 2, }); + this._versioning = resolvedVersioning; } /** @@ -400,6 +408,88 @@ export class TaskHubGrpcWorker { await sleep(1000); } + /** + * Result of version compatibility check. + */ + private _checkVersionCompatibility(req: pb.OrchestratorRequest): { + compatible: boolean; + shouldFail: boolean; + orchestrationVersion?: string; + errorType?: string; + errorMessage?: string; + } { + // If no versioning options configured or match strategy is None, always compatible + if (!this._versioning || this._versioning.matchStrategy === VersionMatchStrategy.None) { + return { compatible: true, shouldFail: false }; + } + + // Extract orchestration version from ExecutionStarted event + const orchestrationVersion = this._getOrchestrationVersion(req); + const workerVersion = this._versioning.version; + + // If worker version is not set, process all + if (!workerVersion) { + return { compatible: true, shouldFail: false }; + } + + let compatible = false; + let errorType = "VersionMismatch"; + let errorMessage = ""; + + switch (this._versioning.matchStrategy) { + case VersionMatchStrategy.Strict: + // Only process if versions match (using semantic comparison) + compatible = compareVersions(orchestrationVersion, workerVersion) === 0; + if (!compatible) { + errorMessage = `The orchestration version '${orchestrationVersion ?? ""}' does not match the worker version '${workerVersion}'.`; + } + break; + + case VersionMatchStrategy.CurrentOrOlder: + // Process if orchestration version is current or older + if (!orchestrationVersion) { + // Empty orchestration version is considered older + compatible = true; + } else { + compatible = compareVersions(orchestrationVersion, workerVersion) <= 0; + if (!compatible) { + errorMessage = `The orchestration version '${orchestrationVersion}' is greater than the worker version '${workerVersion}'.`; + } + } + break; + + default: + // Unknown match strategy - treat as version error + compatible = false; + errorType = "VersionError"; + errorMessage = `The version match strategy '${this._versioning.matchStrategy}' is unknown.`; + break; + } + + if (!compatible) { + const shouldFail = this._versioning.failureStrategy === VersionFailureStrategy.Fail; + return { compatible: false, shouldFail, orchestrationVersion, errorType, errorMessage }; + } + + return { compatible: true, shouldFail: false }; + } + + /** + * Extracts the orchestration version from the ExecutionStarted event in the request. + */ + private _getOrchestrationVersion(req: pb.OrchestratorRequest): string | undefined { + // Look for ExecutionStarted event in both past and new events + const allEvents = [...req.getPasteventsList(), ...req.getNeweventsList()]; + + for (const event of allEvents) { + if (event.hasExecutionstarted()) { + return event.getExecutionstarted()?.getVersion()?.getValue(); + } + } + + return undefined; + } + /** * Executes an orchestrator request and tracks it as a pending work item. */ @@ -429,6 +519,57 @@ export class TaskHubGrpcWorker { throw new Error(`Could not execute the orchestrator as the instanceId was not provided (${instanceId})`); } + // Check version compatibility if versioning is enabled + const versionCheckResult = this._checkVersionCompatibility(req); + if (!versionCheckResult.compatible) { + if (versionCheckResult.shouldFail) { + // Fail the orchestration with version mismatch error + this._logger.warn( + `${versionCheckResult.errorType} for instance '${instanceId}': ${versionCheckResult.errorMessage}. Failing orchestration.`, + ); + + const failureDetails = pbh.newVersionMismatchFailureDetails( + versionCheckResult.errorType!, + versionCheckResult.errorMessage!, + ); + + const actions = [ + pbh.newCompleteOrchestrationAction( + -1, + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + undefined, + failureDetails, + ), + ]; + + const res = new pb.OrchestratorResponse(); + res.setInstanceid(instanceId); + res.setCompletiontoken(completionToken); + res.setActionsList(actions); + + try { + await callWithMetadata(stub.completeOrchestratorTask.bind(stub), res, this._metadataGenerator); + } catch (e: any) { + this._logger.error(`An error occurred while trying to complete instance '${instanceId}': ${e?.message}`); + } + return; + } else { + // Reject the work item - explicitly abandon it so it can be picked up by another worker + this._logger.info( + `${versionCheckResult.errorType} for instance '${instanceId}': ${versionCheckResult.errorMessage}. Abandoning work item.`, + ); + + try { + const abandonRequest = new pb.AbandonOrchestrationTaskRequest(); + abandonRequest.setCompletiontoken(completionToken); + await callWithMetadata(stub.abandonTaskOrchestratorWorkItem.bind(stub), abandonRequest, this._metadataGenerator); + } catch (e: any) { + this._logger.error(`An error occurred while trying to abandon work item for instance '${instanceId}': ${e?.message}`); + } + return; + } + } + let res; try { diff --git a/packages/durabletask-js/src/worker/versioning-options.ts b/packages/durabletask-js/src/worker/versioning-options.ts new file mode 100644 index 0000000..8171312 --- /dev/null +++ b/packages/durabletask-js/src/worker/versioning-options.ts @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * Strategy for matching orchestration versions when processing work items. + */ +export enum VersionMatchStrategy { + /** + * No version matching - process all orchestrations regardless of version. + */ + None = 0, + + /** + * Only process orchestrations that exactly match the worker's version. + */ + Strict = 1, + + /** + * Process orchestrations with the current version or older versions. + * Uses semantic versioning comparison. + */ + CurrentOrOlder = 2, +} + +/** + * Strategy for handling version mismatches when processing work items. + */ +export enum VersionFailureStrategy { + /** + * Reject the work item and let it be picked up by another worker. + * The orchestration will be retried by a compatible worker. + */ + Reject = 0, + + /** + * Fail the orchestration with a version mismatch error. + * This will mark the orchestration as failed. + */ + Fail = 1, +} + +/** + * Options for configuring version-based filtering of orchestrations. + */ +export interface VersioningOptions { + /** + * The version of the worker. This is used for version matching when processing orchestrations. + */ + version?: string; + + /** + * The default version to use when starting new orchestrations without an explicit version. + * This is used by the client when scheduling new orchestrations. + */ + defaultVersion?: string; + + /** + * The strategy for matching orchestration versions. + * @default VersionMatchStrategy.None + */ + matchStrategy?: VersionMatchStrategy; + + /** + * The strategy for handling version mismatches. + * @default VersionFailureStrategy.Reject + */ + failureStrategy?: VersionFailureStrategy; +} diff --git a/packages/durabletask-js/test/client-options.spec.ts b/packages/durabletask-js/test/client-options.spec.ts new file mode 100644 index 0000000..579d065 --- /dev/null +++ b/packages/durabletask-js/test/client-options.spec.ts @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { TaskHubGrpcClient, TaskHubGrpcClientOptions } from "../src"; + +describe("TaskHubGrpcClient", () => { + describe("constructor with options", () => { + it("should accept default version option", () => { + const options: TaskHubGrpcClientOptions = { + hostAddress: "localhost:4001", + defaultVersion: "1.0.0", + }; + + // This should not throw + const client = new TaskHubGrpcClient(options); + + expect(client).toBeDefined(); + }); + + it("should work without default version option", () => { + const options: TaskHubGrpcClientOptions = { + hostAddress: "localhost:4001", + }; + + const client = new TaskHubGrpcClient(options); + + expect(client).toBeDefined(); + }); + + it("should accept all standard options", () => { + const options: TaskHubGrpcClientOptions = { + hostAddress: "localhost:4001", + useTLS: false, + defaultVersion: "2.0.0", + }; + + const client = new TaskHubGrpcClient(options); + + expect(client).toBeDefined(); + }); + }); +}); diff --git a/packages/durabletask-js/test/orchestration_context_methods.spec.ts b/packages/durabletask-js/test/orchestration_context_methods.spec.ts index 5c0beb8..9eea1f5 100644 --- a/packages/durabletask-js/test/orchestration_context_methods.spec.ts +++ b/packages/durabletask-js/test/orchestration_context_methods.spec.ts @@ -11,9 +11,48 @@ import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/work import * as pb from "../src/proto/orchestrator_service_pb"; import { Registry } from "../src/worker/registry"; import { TOrchestrator } from "../src/types/orchestrator.type"; +import { Logger } from "../src/types/logger.type"; +import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; +import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb"; const TEST_INSTANCE_ID = "test-instance-abc123"; +/** + * Helper to create execution started event with version + */ +function newExecutionStartedEventWithVersion( + name: string, + instanceId: string, + version?: string, + encodedInput?: string, +): pb.HistoryEvent { + const ts = new Timestamp(); + + const orchestrationInstance = new pb.OrchestrationInstance(); + orchestrationInstance.setInstanceid(instanceId); + + const executionStartedEvent = new pb.ExecutionStartedEvent(); + executionStartedEvent.setName(name); + if (encodedInput) { + const input = new StringValue(); + input.setValue(encodedInput); + executionStartedEvent.setInput(input); + } + executionStartedEvent.setOrchestrationinstance(orchestrationInstance); + if (version) { + const versionValue = new StringValue(); + versionValue.setValue(version); + executionStartedEvent.setVersion(versionValue); + } + + const event = new pb.HistoryEvent(); + event.setEventid(-1); + event.setTimestamp(ts); + event.setExecutionstarted(executionStartedEvent); + + return event; +} + /** * Helper to extract the CompleteOrchestrationAction from an OrchestrationExecutionResult */ @@ -304,3 +343,274 @@ describe("OrchestrationContext.newGuid", () => { expect(["8", "9", "a", "b"]).toContain(capturedGuid![19].toLowerCase()); // Variant bits }); }); + +describe("OrchestrationContext.version", () => { + it("should return the version from execution started event", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return capturedVersion; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "1.0.0"), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(capturedVersion).toEqual("1.0.0"); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED, + ); + expect(JSON.parse(completeAction?.getResult()?.getValue() ?? "")).toEqual("1.0.0"); + }); + + it("should return empty string when no version is set", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(capturedVersion).toEqual(""); + }); + + it("should preserve version during replay", async () => { + const versions: string[] = []; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext) { + versions.push(ctx.version); + yield ctx.createTimer(1); + versions.push(ctx.version); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + // First execution + const newEvents1 = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "2.0.0"), + ]; + const executor1 = new OrchestrationExecutor(registry); + await executor1.execute(TEST_INSTANCE_ID, [], newEvents1); + + expect(versions[0]).toEqual("2.0.0"); + }); +}); + +describe("OrchestrationContext.createReplaySafeLogger", () => { + it("should create a replay-safe logger that logs when not replaying", async () => { + const logMessages: string[] = []; + const mockLogger: Logger = { + error: (msg) => logMessages.push(`error: ${msg}`), + warn: (msg) => logMessages.push(`warn: ${msg}`), + info: (msg) => logMessages.push(`info: ${msg}`), + debug: (msg) => logMessages.push(`debug: ${msg}`), + }; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + const logger = ctx.createReplaySafeLogger(mockLogger); + logger.info("test message"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // Should have logged since this was not a replay + expect(logMessages).toContain("info: test message"); + }); + + it("should not log during replay phase", async () => { + const logMessages: string[] = []; + const mockLogger: Logger = { + error: (msg) => logMessages.push(`error: ${msg}`), + warn: (msg) => logMessages.push(`warn: ${msg}`), + info: (msg) => logMessages.push(`info: ${msg}`), + debug: (msg) => logMessages.push(`debug: ${msg}`), + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext) { + const logger = ctx.createReplaySafeLogger(mockLogger); + logger.info("first log"); + yield ctx.createTimer(1); + logger.info("second log"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + // First execution - should log "first log" + const startTime = new Date(2024, 0, 15, 10, 30, 0, 0); + const newEvents1 = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor1 = new OrchestrationExecutor(registry); + await executor1.execute(TEST_INSTANCE_ID, [], newEvents1); + + expect(logMessages).toContain("info: first log"); + expect(logMessages).not.toContain("info: second log"); + + // Clear logs + logMessages.length = 0; + + // Now simulate replay + timer fired + const fireAt = new Date(startTime.getTime() + 1000); + const oldEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + // Create timer created event + const timerCreatedEvent = new pb.HistoryEvent(); + timerCreatedEvent.setEventid(1); + const timerCreated = new pb.TimerCreatedEvent(); + const fireAtTs = new Timestamp(); + fireAtTs.fromDate(fireAt); + timerCreated.setFireat(fireAtTs); + timerCreatedEvent.setTimercreated(timerCreated); + oldEvents.push(timerCreatedEvent); + + const newEvents2 = [ + newOrchestratorStartedEvent(fireAt), + ]; + // Create timer fired event + const timerFiredEvent = new pb.HistoryEvent(); + timerFiredEvent.setEventid(-1); + const timerFired = new pb.TimerFiredEvent(); + timerFired.setTimerid(1); + timerFired.setFireat(fireAtTs); + timerFiredEvent.setTimerfired(timerFired); + newEvents2.push(timerFiredEvent); + + const executor2 = new OrchestrationExecutor(registry); + await executor2.execute(TEST_INSTANCE_ID, oldEvents, newEvents2); + + // "first log" should NOT have been logged again (replay phase) + // "second log" SHOULD have been logged (new execution phase) + expect(logMessages).not.toContain("info: first log"); + expect(logMessages).toContain("info: second log"); + }); +}); + +describe("OrchestrationContext.compareVersionTo", () => { + it("should return 0 when versions are equal", async () => { + let comparisonResult: number | undefined; + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + comparisonResult = ctx.compareVersionTo("1.0.0"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "1.0.0"), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(comparisonResult).toEqual(0); + }); + + it("should return positive number when context version is greater", async () => { + let comparisonResult: number | undefined; + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + comparisonResult = ctx.compareVersionTo("1.0.0"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "2.0.0"), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(comparisonResult).toBeGreaterThan(0); + }); + + it("should return negative number when context version is less", async () => { + let comparisonResult: number | undefined; + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + comparisonResult = ctx.compareVersionTo("2.0.0"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "1.0.0"), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(comparisonResult).toBeLessThan(0); + }); + + it("should handle patch version comparison", async () => { + let comparisonResult: number | undefined; + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + comparisonResult = ctx.compareVersionTo("1.0.0"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "1.0.1"), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(comparisonResult).toBeGreaterThan(0); + }); + + it("should handle minor version comparison", async () => { + let comparisonResult: number | undefined; + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + comparisonResult = ctx.compareVersionTo("1.1.0"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEventWithVersion(name, TEST_INSTANCE_ID, "1.0.0"), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(comparisonResult).toBeLessThan(0); + }); +}); + diff --git a/packages/durabletask-js/test/pb-helper-versioning.spec.ts b/packages/durabletask-js/test/pb-helper-versioning.spec.ts new file mode 100644 index 0000000..9331c4c --- /dev/null +++ b/packages/durabletask-js/test/pb-helper-versioning.spec.ts @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { newVersionMismatchFailureDetails, newFailureDetails } from "../src/utils/pb-helper.util"; + +describe("pb-helper.util - Version Mismatch Failure Details", () => { + describe("newVersionMismatchFailureDetails", () => { + it("should create failure details with VersionMismatch error type", () => { + const failure = newVersionMismatchFailureDetails( + "VersionMismatch", + "The orchestration version '2.0.0' does not match the worker version '1.0.0'." + ); + + expect(failure.getErrortype()).toBe("VersionMismatch"); + expect(failure.getErrormessage()).toBe("The orchestration version '2.0.0' does not match the worker version '1.0.0'."); + expect(failure.getIsnonretriable()).toBe(true); + }); + + it("should create failure details with VersionError error type", () => { + const failure = newVersionMismatchFailureDetails( + "VersionError", + "The version match strategy '99' is unknown." + ); + + expect(failure.getErrortype()).toBe("VersionError"); + expect(failure.getErrormessage()).toBe("The version match strategy '99' is unknown."); + expect(failure.getIsnonretriable()).toBe(true); + }); + + it("should set IsNonRetriable to true for version mismatches", () => { + const failure = newVersionMismatchFailureDetails("VersionMismatch", "Test message"); + + // Version mismatches are deterministic and should not be retried + expect(failure.getIsnonretriable()).toBe(true); + }); + + it("should not set stack trace for version mismatches", () => { + const failure = newVersionMismatchFailureDetails("VersionMismatch", "Test message"); + + // Version mismatches don't need stack traces + expect(failure.getStacktrace()).toBeUndefined(); + }); + }); + + describe("newFailureDetails", () => { + it("should create failure details from Error object", () => { + const error = new Error("Something went wrong"); + const failure = newFailureDetails(error); + + expect(failure.getErrortype()).toBe("Error"); + expect(failure.getErrormessage()).toBe("Something went wrong"); + expect(failure.getStacktrace()).toBeDefined(); + }); + + it("should capture error type from custom errors", () => { + class CustomError extends Error { + constructor(message: string) { + super(message); + this.name = "CustomError"; + } + } + + const error = new CustomError("Custom error message"); + const failure = newFailureDetails(error); + + expect(failure.getErrortype()).toBe("CustomError"); + expect(failure.getErrormessage()).toBe("Custom error message"); + }); + }); +}); diff --git a/packages/durabletask-js/test/replay-safe-logger.spec.ts b/packages/durabletask-js/test/replay-safe-logger.spec.ts new file mode 100644 index 0000000..8050346 --- /dev/null +++ b/packages/durabletask-js/test/replay-safe-logger.spec.ts @@ -0,0 +1,156 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { ReplaySafeLogger, ReplayContext } from "../src/types/replay-safe-logger"; +import { Logger } from "../src/types/logger.type"; + +describe("ReplaySafeLogger", () => { + let mockLogger: jest.Mocked; + let mockContext: ReplayContext; + + beforeEach(() => { + mockLogger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + describe("constructor", () => { + it("should throw error when context is null", () => { + expect(() => new ReplaySafeLogger(null as any, mockLogger)).toThrow("context is required"); + }); + + it("should throw error when context is undefined", () => { + expect(() => new ReplaySafeLogger(undefined as any, mockLogger)).toThrow("context is required"); + }); + + it("should throw error when logger is null", () => { + mockContext = { isReplaying: false }; + expect(() => new ReplaySafeLogger(mockContext, null as any)).toThrow("logger is required"); + }); + + it("should throw error when logger is undefined", () => { + mockContext = { isReplaying: false }; + expect(() => new ReplaySafeLogger(mockContext, undefined as any)).toThrow("logger is required"); + }); + + it("should create instance successfully with valid arguments", () => { + mockContext = { isReplaying: false }; + const logger = new ReplaySafeLogger(mockContext, mockLogger); + expect(logger).toBeInstanceOf(ReplaySafeLogger); + }); + }); + + describe("when not replaying", () => { + beforeEach(() => { + mockContext = { isReplaying: false }; + }); + + it("should call inner logger.error when not replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.error("test error message", { instanceId: "123" }); + + expect(mockLogger.error).toHaveBeenCalledWith("test error message", { instanceId: "123" }); + }); + + it("should call inner logger.warn when not replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.warn("test warning message", 42); + + expect(mockLogger.warn).toHaveBeenCalledWith("test warning message", 42); + }); + + it("should call inner logger.info when not replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.info("test info message"); + + expect(mockLogger.info).toHaveBeenCalledWith("test info message"); + }); + + it("should call inner logger.debug when not replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.debug("test debug message", ["array", "data"]); + + expect(mockLogger.debug).toHaveBeenCalledWith("test debug message", ["array", "data"]); + }); + }); + + describe("when replaying", () => { + beforeEach(() => { + mockContext = { isReplaying: true }; + }); + + it("should NOT call inner logger.error when replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.error("test error message"); + + expect(mockLogger.error).not.toHaveBeenCalled(); + }); + + it("should NOT call inner logger.warn when replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.warn("test warning message"); + + expect(mockLogger.warn).not.toHaveBeenCalled(); + }); + + it("should NOT call inner logger.info when replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.info("test info message"); + + expect(mockLogger.info).not.toHaveBeenCalled(); + }); + + it("should NOT call inner logger.debug when replaying", () => { + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + logger.debug("test debug message"); + + expect(mockLogger.debug).not.toHaveBeenCalled(); + }); + }); + + describe("dynamic replay state changes", () => { + it("should respect changes in isReplaying state", () => { + // Start with replaying = true + const dynamicContext = { isReplaying: true }; + const logger = new ReplaySafeLogger(dynamicContext, mockLogger); + + // Should not log while replaying + logger.info("message during replay"); + expect(mockLogger.info).not.toHaveBeenCalled(); + + // Change to not replaying + dynamicContext.isReplaying = false; + + // Should now log + logger.info("message after replay"); + expect(mockLogger.info).toHaveBeenCalledWith("message after replay"); + }); + }); + + describe("Logger interface implementation", () => { + it("should implement Logger interface correctly", () => { + mockContext = { isReplaying: false }; + const logger = new ReplaySafeLogger(mockContext, mockLogger); + + expect(typeof logger.error).toBe("function"); + expect(typeof logger.warn).toBe("function"); + expect(typeof logger.info).toBe("function"); + expect(typeof logger.debug).toBe("function"); + }); + }); +}); diff --git a/packages/durabletask-js/test/terminate-options.spec.ts b/packages/durabletask-js/test/terminate-options.spec.ts new file mode 100644 index 0000000..1a4b2f9 --- /dev/null +++ b/packages/durabletask-js/test/terminate-options.spec.ts @@ -0,0 +1,152 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { + terminateOptions, + isTerminateInstanceOptions, + TERMINATE_OPTIONS_SYMBOL, +} from "../src"; + +describe("TerminateInstanceOptions", () => { + describe("terminateOptions factory function", () => { + it("should create options with recursive flag", () => { + const options = terminateOptions({ recursive: true }); + + expect(options.recursive).toBe(true); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + + it("should create options with output", () => { + const options = terminateOptions({ output: { reason: "cancelled" } }); + + expect(options.output).toEqual({ reason: "cancelled" }); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + + it("should create options with both recursive and output", () => { + const options = terminateOptions({ + recursive: true, + output: { reason: "cancelled" }, + }); + + expect(options.recursive).toBe(true); + expect(options.output).toEqual({ reason: "cancelled" }); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + + it("should create options with empty object", () => { + const options = terminateOptions({}); + + expect(options.recursive).toBeUndefined(); + expect(options.output).toBeUndefined(); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + }); + + describe("isTerminateInstanceOptions type guard", () => { + it("should return true for options created with terminateOptions()", () => { + const options = terminateOptions({ recursive: true }); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + + it("should return false for plain objects with recursive property", () => { + // This is the key test - user output that happens to have 'recursive' property + const userOutput = { recursive: true, data: "some value" }; + expect(isTerminateInstanceOptions(userOutput)).toBe(false); + }); + + it("should return false for plain objects with output property", () => { + // User output that happens to have 'output' property + const userOutput = { output: "some value", status: "done" }; + expect(isTerminateInstanceOptions(userOutput)).toBe(false); + }); + + it("should return false for plain objects with both recursive and output properties", () => { + // User output that coincidentally has both properties + const userOutput = { recursive: true, output: "some data", extra: "field" }; + expect(isTerminateInstanceOptions(userOutput)).toBe(false); + }); + + it("should return false for null", () => { + expect(isTerminateInstanceOptions(null)).toBe(false); + }); + + it("should return false for undefined", () => { + expect(isTerminateInstanceOptions(undefined)).toBe(false); + }); + + it("should return false for primitives", () => { + expect(isTerminateInstanceOptions("string")).toBe(false); + expect(isTerminateInstanceOptions(123)).toBe(false); + expect(isTerminateInstanceOptions(true)).toBe(false); + }); + + it("should return false for arrays", () => { + expect(isTerminateInstanceOptions([1, 2, 3])).toBe(false); + expect(isTerminateInstanceOptions([{ recursive: true }])).toBe(false); + }); + + it("should return false for arbitrary objects", () => { + expect(isTerminateInstanceOptions({ foo: "bar" })).toBe(false); + expect(isTerminateInstanceOptions({ status: "done", data: {} })).toBe(false); + }); + + it("should return false for objects that look like options but lack the symbol", () => { + // This simulates the old detection bug - objects with matching property names + const fakeOptions = { recursive: true, output: { reason: "test" } }; + expect(isTerminateInstanceOptions(fakeOptions)).toBe(false); + }); + }); + + describe("TERMINATE_OPTIONS_SYMBOL", () => { + it("should be a symbol", () => { + expect(typeof TERMINATE_OPTIONS_SYMBOL).toBe("symbol"); + }); + + it("should be consistent across imports (Symbol.for)", () => { + const symbol1 = Symbol.for("durabletask.TerminateInstanceOptions"); + const symbol2 = Symbol.for("durabletask.TerminateInstanceOptions"); + expect(symbol1).toBe(symbol2); + expect(symbol1).toBe(TERMINATE_OPTIONS_SYMBOL); + }); + + it("should be present in options created with terminateOptions()", () => { + const options = terminateOptions({ recursive: true }); + expect(TERMINATE_OPTIONS_SYMBOL in options).toBe(true); + expect(options[TERMINATE_OPTIONS_SYMBOL]).toBe(true); + }); + }); + + describe("edge cases for user output disambiguation", () => { + it("should treat plain object with recursive:true as output, not options", () => { + // This was the original bug - user passing output like {recursive: true, data: {...}} + const userOutput = { recursive: true, data: { value: 123 } }; + + // Should NOT be detected as TerminateInstanceOptions + expect(isTerminateInstanceOptions(userOutput)).toBe(false); + + // When used with terminateOptions, it works correctly + const options = terminateOptions({ output: userOutput, recursive: false }); + expect(isTerminateInstanceOptions(options)).toBe(true); + expect(options.output).toEqual(userOutput); + }); + + it("should correctly handle nested objects with problematic property names", () => { + const complexOutput = { + recursive: true, + output: "nested", + metadata: { + recursive: false, + output: "deeply nested" + } + }; + + // Plain object should not be detected as options + expect(isTerminateInstanceOptions(complexOutput)).toBe(false); + + // Wrapped in terminateOptions should work + const options = terminateOptions({ output: complexOutput }); + expect(isTerminateInstanceOptions(options)).toBe(true); + }); + }); +}); diff --git a/packages/durabletask-js/test/versioning-options.spec.ts b/packages/durabletask-js/test/versioning-options.spec.ts new file mode 100644 index 0000000..2fa23ba --- /dev/null +++ b/packages/durabletask-js/test/versioning-options.spec.ts @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { VersionMatchStrategy, VersionFailureStrategy, VersioningOptions } from "../src/worker/versioning-options"; + +describe("VersioningOptions", () => { + describe("VersionMatchStrategy enum", () => { + it("should have None = 0", () => { + expect(VersionMatchStrategy.None).toBe(0); + }); + + it("should have Strict = 1", () => { + expect(VersionMatchStrategy.Strict).toBe(1); + }); + + it("should have CurrentOrOlder = 2", () => { + expect(VersionMatchStrategy.CurrentOrOlder).toBe(2); + }); + }); + + describe("VersionFailureStrategy enum", () => { + it("should have Reject = 0", () => { + expect(VersionFailureStrategy.Reject).toBe(0); + }); + + it("should have Fail = 1", () => { + expect(VersionFailureStrategy.Fail).toBe(1); + }); + }); + + describe("VersioningOptions interface", () => { + it("should allow creating options with all properties", () => { + const options: VersioningOptions = { + version: "1.0.0", + defaultVersion: "1.0.0", + matchStrategy: VersionMatchStrategy.Strict, + failureStrategy: VersionFailureStrategy.Fail, + }; + + expect(options.version).toBe("1.0.0"); + expect(options.defaultVersion).toBe("1.0.0"); + expect(options.matchStrategy).toBe(VersionMatchStrategy.Strict); + expect(options.failureStrategy).toBe(VersionFailureStrategy.Fail); + }); + + it("should allow creating options with only version", () => { + const options: VersioningOptions = { + version: "2.0.0", + }; + + expect(options.version).toBe("2.0.0"); + expect(options.defaultVersion).toBeUndefined(); + expect(options.matchStrategy).toBeUndefined(); + expect(options.failureStrategy).toBeUndefined(); + }); + + it("should allow creating empty options", () => { + const options: VersioningOptions = {}; + + expect(options.version).toBeUndefined(); + expect(options.defaultVersion).toBeUndefined(); + expect(options.matchStrategy).toBeUndefined(); + expect(options.failureStrategy).toBeUndefined(); + }); + + it("should allow CurrentOrOlder match strategy", () => { + const options: VersioningOptions = { + version: "3.0.0", + matchStrategy: VersionMatchStrategy.CurrentOrOlder, + failureStrategy: VersionFailureStrategy.Reject, + }; + + expect(options.matchStrategy).toBe(VersionMatchStrategy.CurrentOrOlder); + expect(options.failureStrategy).toBe(VersionFailureStrategy.Reject); + }); + + it("should allow None match strategy (process all versions)", () => { + const options: VersioningOptions = { + version: "1.0.0", + matchStrategy: VersionMatchStrategy.None, + }; + + expect(options.matchStrategy).toBe(VersionMatchStrategy.None); + }); + + it("should allow defaultVersion for client orchestration scheduling", () => { + const options: VersioningOptions = { + version: "2.0.0", + defaultVersion: "1.5.0", + matchStrategy: VersionMatchStrategy.CurrentOrOlder, + }; + + expect(options.version).toBe("2.0.0"); + expect(options.defaultVersion).toBe("1.5.0"); + }); + }); +}); diff --git a/packages/durabletask-js/test/versioning.spec.ts b/packages/durabletask-js/test/versioning.spec.ts new file mode 100644 index 0000000..9e396c0 --- /dev/null +++ b/packages/durabletask-js/test/versioning.spec.ts @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { compareVersions } from "../src/utils/versioning.util"; + +describe("compareVersions", () => { + describe("empty/undefined versions", () => { + it("should return 0 when both versions are undefined", () => { + expect(compareVersions(undefined, undefined)).toBe(0); + }); + + it("should return 0 when both versions are empty strings", () => { + expect(compareVersions("", "")).toBe(0); + }); + + it("should return 0 when both versions are whitespace", () => { + expect(compareVersions(" ", " ")).toBe(0); + }); + + it("should return 0 when one is undefined and other is empty", () => { + expect(compareVersions(undefined, "")).toBe(0); + expect(compareVersions("", undefined)).toBe(0); + }); + + it("should return negative when source is empty and other is defined", () => { + expect(compareVersions("", "1.0.0")).toBeLessThan(0); + expect(compareVersions(undefined, "1.0.0")).toBeLessThan(0); + expect(compareVersions(" ", "1.0.0")).toBeLessThan(0); + }); + + it("should return positive when source is defined and other is empty", () => { + expect(compareVersions("1.0.0", "")).toBeGreaterThan(0); + expect(compareVersions("1.0.0", undefined)).toBeGreaterThan(0); + expect(compareVersions("1.0.0", " ")).toBeGreaterThan(0); + }); + }); + + describe("semantic version comparison", () => { + it("should compare major versions correctly", () => { + expect(compareVersions("1.0.0", "2.0.0")).toBeLessThan(0); + expect(compareVersions("2.0.0", "1.0.0")).toBeGreaterThan(0); + expect(compareVersions("10.0.0", "2.0.0")).toBeGreaterThan(0); + }); + + it("should compare minor versions correctly", () => { + expect(compareVersions("1.1.0", "1.2.0")).toBeLessThan(0); + expect(compareVersions("1.2.0", "1.1.0")).toBeGreaterThan(0); + expect(compareVersions("1.10.0", "1.2.0")).toBeGreaterThan(0); + }); + + it("should compare patch versions correctly", () => { + expect(compareVersions("1.0.1", "1.0.2")).toBeLessThan(0); + expect(compareVersions("1.0.2", "1.0.1")).toBeGreaterThan(0); + expect(compareVersions("1.0.10", "1.0.2")).toBeGreaterThan(0); + }); + + it("should return 0 for equal versions", () => { + expect(compareVersions("1.0.0", "1.0.0")).toBe(0); + expect(compareVersions("2.5.3", "2.5.3")).toBe(0); + }); + + it("should handle versions with different component counts", () => { + expect(compareVersions("1", "1.0")).toBe(0); + expect(compareVersions("1.0", "1.0.0")).toBe(0); + expect(compareVersions("1", "1.0.0")).toBe(0); + expect(compareVersions("1", "2")).toBeLessThan(0); + expect(compareVersions("1.0", "1.1")).toBeLessThan(0); + }); + + it("should handle 4-component versions", () => { + expect(compareVersions("1.0.0.0", "1.0.0.1")).toBeLessThan(0); + expect(compareVersions("1.0.0.1", "1.0.0.0")).toBeGreaterThan(0); + expect(compareVersions("1.0.0.0", "1.0.0.0")).toBe(0); + }); + }); + + describe("non-semantic version comparison (lexicographic)", () => { + it("should compare non-semver strings lexicographically", () => { + expect(compareVersions("alpha", "beta")).toBeLessThan(0); + expect(compareVersions("beta", "alpha")).toBeGreaterThan(0); + expect(compareVersions("alpha", "alpha")).toBe(0); + }); + + it("should be case-insensitive for lexicographic comparison", () => { + expect(compareVersions("Alpha", "alpha")).toBe(0); + expect(compareVersions("BETA", "beta")).toBe(0); + expect(compareVersions("Alpha", "BETA")).toBeLessThan(0); + }); + + it("should compare version strings with prefixes", () => { + expect(compareVersions("v1.0", "v2.0")).toBeLessThan(0); + expect(compareVersions("release-1", "release-2")).toBeLessThan(0); + }); + }); + + describe("mixed scenarios", () => { + it("should handle pre-release style versions", () => { + // These will fall back to lexicographic since they contain non-numeric chars + expect(compareVersions("1.0.0-alpha", "1.0.0-beta")).toBeLessThan(0); + expect(compareVersions("1.0.0-rc1", "1.0.0-rc2")).toBeLessThan(0); + }); + + it("should compare simple numeric versions", () => { + expect(compareVersions("1", "2")).toBeLessThan(0); + expect(compareVersions("10", "2")).toBeGreaterThan(0); + expect(compareVersions("1", "1")).toBe(0); + }); + }); + + describe("mixed semantic and non-semantic versions", () => { + it("should fall back to lexicographic when one version is semver and other is not", () => { + // "1.0.0" is valid semver, "alpha" is not - should use lexicographic comparison + // "1" < "a" in ASCII, so "1.0.0" < "alpha" lexicographically + expect(compareVersions("1.0.0", "alpha")).toBeLessThan(0); + expect(compareVersions("alpha", "1.0.0")).toBeGreaterThan(0); + }); + + it("should handle semver vs prefixed version strings", () => { + // "1.0.0" is valid semver, "v1.0.0" is not (has 'v' prefix) + // Falls back to lexicographic: "1" < "v" + expect(compareVersions("1.0.0", "v1.0.0")).toBeLessThan(0); + expect(compareVersions("v1.0.0", "1.0.0")).toBeGreaterThan(0); + }); + + it("should handle semver vs pre-release style versions", () => { + // "2.0.0" is valid semver, "1.0.0-beta" is not (has "-beta" suffix) + // Falls back to lexicographic: "2" > "1" + expect(compareVersions("2.0.0", "1.0.0-beta")).toBeGreaterThan(0); + expect(compareVersions("1.0.0-beta", "2.0.0")).toBeLessThan(0); + }); + + it("should handle semver vs text-only versions", () => { + // Comparing numeric semver with pure text versions + expect(compareVersions("1.0.0", "latest")).toBeLessThan(0); + expect(compareVersions("latest", "1.0.0")).toBeGreaterThan(0); + expect(compareVersions("2.5.0", "stable")).toBeLessThan(0); + expect(compareVersions("stable", "2.5.0")).toBeGreaterThan(0); + }); + + it("should handle semver vs versions with build metadata", () => { + // "1.0.0" is valid semver, "1.0.0+build123" is not (has build metadata) + // Falls back to lexicographic + expect(compareVersions("1.0.0", "1.0.0+build123")).toBeLessThan(0); + expect(compareVersions("1.0.0+build123", "1.0.0")).toBeGreaterThan(0); + }); + + it("should handle numeric semver vs alphanumeric non-semver", () => { + // "3.0.0" is valid semver, "3.0.0rc1" is not (no separator before rc) + // Falls back to lexicographic: "3.0.0" < "3.0.0rc1" + expect(compareVersions("3.0.0", "3.0.0rc1")).toBeLessThan(0); + expect(compareVersions("3.0.0rc1", "3.0.0")).toBeGreaterThan(0); + }); + }); +}); diff --git a/packages/durabletask-js/test/worker-versioning.spec.ts b/packages/durabletask-js/test/worker-versioning.spec.ts new file mode 100644 index 0000000..8fd8663 --- /dev/null +++ b/packages/durabletask-js/test/worker-versioning.spec.ts @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { TaskHubGrpcWorker, VersionMatchStrategy, VersionFailureStrategy, VersioningOptions } from "../src"; + +describe("TaskHubGrpcWorker versioning", () => { + describe("constructor with versioning options", () => { + it("should accept versioning options", () => { + const versioning: VersioningOptions = { + version: "1.0.0", + matchStrategy: VersionMatchStrategy.Strict, + failureStrategy: VersionFailureStrategy.Fail, + }; + + // This should not throw + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning, + }); + + expect(worker).toBeDefined(); + }); + + it("should accept versioning with CurrentOrOlder strategy", () => { + const versioning: VersioningOptions = { + version: "2.0.0", + matchStrategy: VersionMatchStrategy.CurrentOrOlder, + failureStrategy: VersionFailureStrategy.Reject, + }; + + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning, + }); + + expect(worker).toBeDefined(); + }); + + it("should accept versioning with None strategy (default behavior)", () => { + const versioning: VersioningOptions = { + version: "1.0.0", + matchStrategy: VersionMatchStrategy.None, + }; + + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning, + }); + + expect(worker).toBeDefined(); + }); + + it("should work without versioning options", () => { + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + }); + + expect(worker).toBeDefined(); + }); + + it("should accept empty versioning options", () => { + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning: {}, + }); + + expect(worker).toBeDefined(); + }); + + it("should accept versioning with defaultVersion option", () => { + const versioning: VersioningOptions = { + version: "1.0.0", + matchStrategy: VersionMatchStrategy.Strict, + defaultVersion: "0.1.0", + }; + + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning, + }); + + expect(worker).toBeDefined(); + }); + }); + + describe("VersionMatchStrategy enum values", () => { + it("should have correct enum values matching .NET", () => { + // Match .NET: None = 0, Strict = 1, CurrentOrOlder = 2 + expect(VersionMatchStrategy.None).toBe(0); + expect(VersionMatchStrategy.Strict).toBe(1); + expect(VersionMatchStrategy.CurrentOrOlder).toBe(2); + }); + }); + + describe("VersionFailureStrategy enum values", () => { + it("should have correct enum values matching .NET", () => { + // Match .NET: Reject = 0, Fail = 1 + expect(VersionFailureStrategy.Reject).toBe(0); + expect(VersionFailureStrategy.Fail).toBe(1); + }); + }); + + describe("Strict matching behavior", () => { + it("should use semantic version comparison for Strict matching", () => { + // Strict matching should use compareVersions for semantic comparison + // This means "1.0.0" === "1.0.0" (true) + // And "1.0.0" !== "1.0.1" (not equal) + // And "1.0.0" !== "2.0.0" (not equal) + // This is tested by creating workers - actual behavior tested in integration tests + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning: { + version: "1.0.0", + matchStrategy: VersionMatchStrategy.Strict, + failureStrategy: VersionFailureStrategy.Fail, + }, + }); + + expect(worker).toBeDefined(); + }); + }); + + describe("CurrentOrOlder matching behavior", () => { + it("should allow orchestrations with older versions", () => { + // CurrentOrOlder should accept orchestrations with versions <= worker version + // This means worker "2.0.0" accepts orchestration "1.0.0" (1.0.0 <= 2.0.0) + // But rejects orchestration "3.0.0" (3.0.0 > 2.0.0) + const worker = new TaskHubGrpcWorker({ + hostAddress: "localhost:4001", + versioning: { + version: "2.0.0", + matchStrategy: VersionMatchStrategy.CurrentOrOlder, + failureStrategy: VersionFailureStrategy.Reject, + }, + }); + + expect(worker).toBeDefined(); + }); + }); +}); diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index 46d8ce3..40d7cc3 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -22,10 +22,15 @@ import { Task, TOrchestrator, RetryPolicy, + Logger, + terminateOptions, + PurgeInstanceOptions, } from "@microsoft/durabletask-js"; import { DurableTaskAzureManagedClientBuilder, DurableTaskAzureManagedWorkerBuilder, + VersionMatchStrategy, + VersionFailureStrategy, } from "@microsoft/durabletask-js-azuremanaged"; // Read environment variables @@ -51,6 +56,18 @@ function createWorker(): TaskHubGrpcWorker { .build(); } +function createWorkerWithVersioning( + version: string, + matchStrategy: VersionMatchStrategy = VersionMatchStrategy.None, + failureStrategy: VersionFailureStrategy = VersionFailureStrategy.Reject, +): TaskHubGrpcWorker { + const builder = connectionString + ? new DurableTaskAzureManagedWorkerBuilder().connectionString(connectionString) + : new DurableTaskAzureManagedWorkerBuilder().endpoint(endpoint, taskHub, null); + + return builder.versioning({ version, matchStrategy, failureStrategy }).build(); +} + describe("Durable Task Scheduler (DTS) E2E Tests", () => { let taskHubClient: TaskHubGrpcClient; let taskHubWorker: TaskHubGrpcWorker; @@ -61,7 +78,12 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { }); afterEach(async () => { - await taskHubWorker.stop(); + // Only stop the worker if it was started + try { + await taskHubWorker.stop(); + } catch { + // Worker wasn't started, ignore the error + } await taskHubClient.stop(); }); @@ -739,6 +761,177 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(receiverState?.serializedOutput).toEqual(JSON.stringify("received signal")); }, 45000); + describe("Versioning", () => { + it("should be able to pass version when scheduling orchestration", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return ctx.version; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "1.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual("1.0.0"); + expect(state?.serializedOutput).toEqual(JSON.stringify("1.0.0")); + }, 31000); + + it("should have empty version when not specified", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual(""); + }, 31000); + }); + + describe("ReplaySafeLogger", () => { + it("should only log when not replaying", async () => { + const logMessages: string[] = []; + const mockLogger: Logger = { + error: (msg) => logMessages.push(`error: ${msg}`), + warn: (msg) => logMessages.push(`warn: ${msg}`), + info: (msg) => logMessages.push(`info: ${msg}`), + debug: (msg) => logMessages.push(`debug: ${msg}`), + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const logger = ctx.createReplaySafeLogger(mockLogger); + logger.info("before timer"); + yield ctx.createTimer(1); + logger.info("after timer"); + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // Each log message should appear only once (not duplicated during replay) + const beforeTimerLogs = logMessages.filter((m) => m.includes("before timer")); + const afterTimerLogs = logMessages.filter((m) => m.includes("after timer")); + + expect(beforeTimerLogs.length).toEqual(1); + expect(afterTimerLogs.length).toEqual(1); + }, 31000); + }); + + describe("Terminate with options", () => { + it("should terminate orchestration with output using options object", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Wait forever + yield ctx.waitForExternalEvent("never-coming"); + return "should not reach here"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + await taskHubClient.waitForOrchestrationStart(id, undefined, 10); + + // Terminate with options object using terminateOptions factory + await taskHubClient.terminateOrchestration(id, terminateOptions({ output: "terminated-output" })); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(state?.serializedOutput).toEqual(JSON.stringify("terminated-output")); + }, 31000); + + it("should terminate orchestration with legacy signature (output only)", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("never-coming"); + return "should not reach here"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + await taskHubClient.waitForOrchestrationStart(id, undefined, 10); + + // Use legacy signature + await taskHubClient.terminateOrchestration(id, "legacy-output"); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(state?.serializedOutput).toEqual(JSON.stringify("legacy-output")); + }, 31000); + }); + + describe("Purge with options", () => { + it("should purge orchestration by instance ID", async () => { + const orchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // Purge the completed orchestration + const purgeResult = await taskHubClient.purgeOrchestration(id); + + expect(purgeResult).toBeDefined(); + expect(purgeResult?.deletedInstanceCount).toEqual(1); + + // Verify the orchestration is gone + const stateAfterPurge = await taskHubClient.getOrchestrationState(id); + expect(stateAfterPurge).toBeUndefined(); + }, 31000); + + it("should purge orchestration with options object", async () => { + const orchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // Purge with options (recursive: false for simple case) + const purgeResult = await taskHubClient.purgeOrchestration(id, { recursive: false }); + + expect(purgeResult).toBeDefined(); + expect(purgeResult?.deletedInstanceCount).toEqual(1); + }, 31000); + }); it("should expose parent orchestration info in sub-orchestrations", async () => { // Child orchestration that captures and returns its parent info const childOrchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { @@ -811,4 +1004,373 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(result.hasParent).toBe(false); expect(result.parentInfo).toBeUndefined(); }, 31000); + + describe("Versioning with MatchStrategy", () => { + it("should process orchestration when MatchStrategy.None is used (always matches)", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return ctx.version; + }; + + // Worker with version "1.0.0" but MatchStrategy.None means it accepts all versions + const worker = createWorkerWithVersioning("1.0.0", VersionMatchStrategy.None); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with different version + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "2.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual("2.0.0"); // Orchestration's version, not worker's + expect(state?.serializedOutput).toEqual(JSON.stringify("2.0.0")); + } finally { + await worker.stop(); + } + }, 31000); + + it("should process orchestration when MatchStrategy.Strict matches exactly", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return ctx.version; + }; + + // Worker with version "1.0.0" and MatchStrategy.Strict + const worker = createWorkerWithVersioning("1.0.0", VersionMatchStrategy.Strict); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with exact same version + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "1.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual("1.0.0"); + expect(state?.serializedOutput).toEqual(JSON.stringify("1.0.0")); + } finally { + await worker.stop(); + } + }, 31000); + + it("should process orchestration when MatchStrategy.CurrentOrOlder with matching version", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return ctx.version; + }; + + // Worker with version "2.0.0" and MatchStrategy.CurrentOrOlder + const worker = createWorkerWithVersioning("2.0.0", VersionMatchStrategy.CurrentOrOlder); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with older version - should be processed + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "1.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual("1.0.0"); + expect(state?.serializedOutput).toEqual(JSON.stringify("1.0.0")); + } finally { + await worker.stop(); + } + }, 31000); + + it("should process orchestration when MatchStrategy.CurrentOrOlder with same version", async () => { + let capturedVersion: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedVersion = ctx.version; + return ctx.version; + }; + + // Worker with version "2.0.0" and MatchStrategy.CurrentOrOlder + const worker = createWorkerWithVersioning("2.0.0", VersionMatchStrategy.CurrentOrOlder); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with same version + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "2.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(capturedVersion).toEqual("2.0.0"); + expect(state?.serializedOutput).toEqual(JSON.stringify("2.0.0")); + } finally { + await worker.stop(); + } + }, 31000); + }); + + describe("Versioning with FailureStrategy", () => { + it("should fail orchestration when MatchStrategy.Strict mismatches with FailureStrategy.Fail", async () => { + const orchestrator: TOrchestrator = async (_ctx: OrchestrationContext) => { + return "should not reach here"; + }; + + // Worker with version "1.0.0", Strict match, and Fail strategy + const worker = createWorkerWithVersioning( + "1.0.0", + VersionMatchStrategy.Strict, + VersionFailureStrategy.Fail, + ); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with different version - should fail + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "2.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails?.message).toContain("version"); + } finally { + await worker.stop(); + } + }, 31000); + + it("should fail orchestration when MatchStrategy.CurrentOrOlder with newer version and FailureStrategy.Fail", async () => { + const orchestrator: TOrchestrator = async (_ctx: OrchestrationContext) => { + return "should not reach here"; + }; + + // Worker with version "1.0.0", CurrentOrOlder match, and Fail strategy + const worker = createWorkerWithVersioning( + "1.0.0", + VersionMatchStrategy.CurrentOrOlder, + VersionFailureStrategy.Fail, + ); + worker.addOrchestrator(orchestrator); + await worker.start(); + + try { + // Schedule orchestration with newer version - should fail + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { + version: "2.0.0", + }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails?.message).toContain("version"); + } finally { + await worker.stop(); + } + }, 31000); + + // Note: FailureStrategy.Reject causes the worker to abandon the work item silently, + // so the orchestration would remain in PENDING state. This is harder to test in E2E + // because we'd need to wait for the orchestration to NOT complete (negative test). + // The unit tests cover this case more thoroughly. + }); + + describe("Recursive terminate", () => { + it("should terminate parent and child orchestrations recursively", async () => { + // Child orchestration that waits forever + const childOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("never-coming"); + return "child completed"; + }; + + // Parent orchestration that starts a child and waits + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Start child and wait for it (but it never completes) + const childResult = yield ctx.callSubOrchestrator(childOrchestrator); + return { parentCompleted: true, childResult }; + }; + + taskHubWorker.addOrchestrator(childOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const parentId = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + await taskHubClient.waitForOrchestrationStart(parentId, undefined, 10); + + // Wait a bit for child to be started + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Terminate recursively with output using terminateOptions factory + await taskHubClient.terminateOrchestration( + parentId, + terminateOptions({ output: "terminated-by-test", recursive: true }), + ); + + // Wait for parent to be terminated + const parentState = await taskHubClient.waitForOrchestrationCompletion(parentId, undefined, 30); + + expect(parentState).toBeDefined(); + expect(parentState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(parentState?.serializedOutput).toEqual(JSON.stringify("terminated-by-test")); + + // Find and verify child is also terminated + // The child instance ID follows a pattern based on parent ID + // We need to query for sub-orchestrations + // For now, we verify by checking the parent completed with termination + // This confirms the recursive flag was processed + }, 60000); + + it("should terminate only parent when recursive is false", async () => { + let childStarted = false; + + // Child orchestration that sets a flag and waits + const childOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + childStarted = true; + yield ctx.waitForExternalEvent("never-coming"); + return "child completed"; + }; + + // Parent orchestration that starts a child and waits + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const childResult = yield ctx.callSubOrchestrator(childOrchestrator); + return { parentCompleted: true, childResult }; + }; + + taskHubWorker.addOrchestrator(childOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const parentId = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + await taskHubClient.waitForOrchestrationStart(parentId, undefined, 10); + + // Wait a bit for child to be started + await new Promise((resolve) => setTimeout(resolve, 2000)); + + expect(childStarted).toBe(true); + + // Terminate non-recursively using terminateOptions factory + await taskHubClient.terminateOrchestration( + parentId, + terminateOptions({ output: "parent-terminated", recursive: false }), + ); + + const parentState = await taskHubClient.waitForOrchestrationCompletion(parentId, undefined, 30); + + expect(parentState).toBeDefined(); + expect(parentState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(parentState?.serializedOutput).toEqual(JSON.stringify("parent-terminated")); + + // Note: The child may or may not be terminated depending on how the runtime handles it. + // The key assertion is that the parent was terminated with the correct output. + }, 60000); + + it("should preserve termination output when using terminateOptions", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("never-coming"); + return "should not reach here"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + await taskHubClient.waitForOrchestrationStart(id, undefined, 10); + + // Terminate with complex output object + const complexOutput = { reason: "test termination", timestamp: Date.now(), data: [1, 2, 3] }; + await taskHubClient.terminateOrchestration(id, terminateOptions({ output: complexOutput })); + + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + expect(JSON.parse(state?.serializedOutput || "{}")).toEqual(complexOutput); + }, 31000); + }); + + describe("Recursive purge", () => { + it("should purge parent and child orchestrations recursively", async () => { + // Child orchestration that completes immediately + const childOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext) => { + return "child done"; + }; + + // Parent orchestration that creates a child + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const childResult = yield ctx.callSubOrchestrator(childOrchestrator); + return { parentDone: true, childResult }; + }; + + taskHubWorker.addOrchestrator(childOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const parentId = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(parentId, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // Purge recursively + const purgeOptions: PurgeInstanceOptions = { recursive: true }; + const purgeResult = await taskHubClient.purgeOrchestration(parentId, purgeOptions); + + expect(purgeResult).toBeDefined(); + // When recursive is true, the parent should be deleted + // Note: The actual count depends on the DTS implementation - at minimum, the parent is deleted + expect(purgeResult?.deletedInstanceCount).toBeGreaterThanOrEqual(1); + + // Verify parent is gone + const parentStateAfterPurge = await taskHubClient.getOrchestrationState(parentId); + expect(parentStateAfterPurge).toBeUndefined(); + }, 60000); + + it("should purge only parent when recursive is false", async () => { + // Child orchestration that completes immediately + const childOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext) => { + return "child done"; + }; + + // Parent orchestration that creates a child + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const childResult = yield ctx.callSubOrchestrator(childOrchestrator); + return { parentDone: true, childResult }; + }; + + taskHubWorker.addOrchestrator(childOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const parentId = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(parentId, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // Purge non-recursively + const purgeOptions: PurgeInstanceOptions = { recursive: false }; + const purgeResult = await taskHubClient.purgeOrchestration(parentId, purgeOptions); + + expect(purgeResult).toBeDefined(); + // When recursive is false, only the parent should be deleted + expect(purgeResult?.deletedInstanceCount).toEqual(1); + + // Verify parent is gone + const parentStateAfterPurge = await taskHubClient.getOrchestrationState(parentId); + expect(parentStateAfterPurge).toBeUndefined(); + }, 60000); + }); });