Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/durabletask-js-azuremanaged/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ export { DurableTaskAzureManagedWorkerBuilder, createAzureManagedWorkerBuilder }
// Logger exports - re-export from core package for convenience
export { Logger, ConsoleLogger, NoOpLogger } from "@microsoft/durabletask-js";

// Versioning exports - re-export from core package for convenience
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "@microsoft/durabletask-js";

// Azure-specific logger adapter
export { AzureLoggerAdapter, createAzureLogger } from "./azure-logger-adapter";
43 changes: 32 additions & 11 deletions packages/durabletask-js-azuremanaged/src/worker-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@
import { TokenCredential } from "@azure/identity";
import * as grpc from "@grpc/grpc-js";
import { DurableTaskAzureManagedWorkerOptions } from "./options";
import { TaskHubGrpcWorker, TOrchestrator, TActivity, TInput, TOutput, Logger, ConsoleLogger } from "@microsoft/durabletask-js";
import {
TaskHubGrpcWorker,
TOrchestrator,
TActivity,
TInput,
TOutput,
Logger,
ConsoleLogger,
VersioningOptions,
} from "@microsoft/durabletask-js";

/**
* Builder for creating DurableTaskWorker instances that connect to Azure-managed Durable Task service.
Expand All @@ -17,6 +26,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
private _activities: { name?: string; fn: TActivity<TInput, TOutput> }[] = [];
private _logger: Logger = new ConsoleLogger();
private _shutdownTimeoutMs?: number;
private _versioning?: VersioningOptions;

/**
* Creates a new instance of DurableTaskAzureManagedWorkerBuilder.
Expand Down Expand Up @@ -198,6 +208,18 @@ export class DurableTaskAzureManagedWorkerBuilder {
return this;
}

/**
* Configures versioning options for the worker.
* This allows filtering orchestrations by version using different match strategies.
*
* @param options The versioning options including version, matchStrategy, and failureStrategy.
* @returns This builder instance.
*/
versioning(options: VersioningOptions): DurableTaskAzureManagedWorkerBuilder {
this._versioning = options;
return this;
}

/**
* Builds and returns a configured TaskHubGrpcWorker.
*
Expand All @@ -219,18 +241,17 @@ export class DurableTaskAzureManagedWorkerBuilder {
...this._grpcChannelOptions,
};

// Use the core TaskHubGrpcWorker with custom credentials and metadata generator
// For insecure connections, metadata is passed via the metadataGenerator parameter
// For secure connections, metadata is included in the channel credentials
const worker = new TaskHubGrpcWorker(
// Use the core TaskHubGrpcWorker with options-based constructor
const worker = new TaskHubGrpcWorker({
hostAddress,
combinedOptions,
true,
channelCredentials,
options: combinedOptions,
useTLS: true,
credentials: channelCredentials,
metadataGenerator,
this._logger,
this._shutdownTimeoutMs,
);
logger: this._logger,
shutdownTimeoutMs: this._shutdownTimeoutMs,
versioning: this._versioning,
});

// Register all orchestrators
for (const { name, fn } of this._orchestrators) {
Expand Down
78 changes: 70 additions & 8 deletions packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/
import { TimeoutError } from "../exception/timeout-error";
import { PurgeResult } from "../orchestration/orchestration-purge-result";
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
import { PurgeInstanceOptions } from "../orchestration/orchestration-purge-options";
import { TerminateInstanceOptions, isTerminateInstanceOptions } from "../orchestration/orchestration-terminate-options";
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
Expand Down Expand Up @@ -47,12 +49,19 @@ export interface TaskHubGrpcClientOptions {
metadataGenerator?: MetadataGenerator;
/** Optional logger instance. Defaults to ConsoleLogger. */
logger?: Logger;
/**
* The default version to use when starting new orchestrations without an explicit version.
* If specified, this will be used as the version for orchestrations that don't provide
* their own version in StartOrchestrationOptions.
*/
defaultVersion?: string;
}

export class TaskHubGrpcClient {
private _stub: stubs.TaskHubSidecarServiceClient;
private _metadataGenerator?: MetadataGenerator;
private _logger: Logger;
private _defaultVersion?: string;

/**
* Creates a new TaskHubGrpcClient instance.
Expand Down Expand Up @@ -95,6 +104,7 @@ export class TaskHubGrpcClient {
let resolvedCredentials: grpc.ChannelCredentials | undefined;
let resolvedMetadataGenerator: MetadataGenerator | undefined;
let resolvedLogger: Logger | undefined;
let resolvedDefaultVersion: string | undefined;

if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
// Options object constructor
Expand All @@ -104,6 +114,7 @@ export class TaskHubGrpcClient {
resolvedCredentials = hostAddressOrOptions.credentials;
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
resolvedLogger = hostAddressOrOptions.logger;
resolvedDefaultVersion = hostAddressOrOptions.defaultVersion;
} else {
// Deprecated positional parameters constructor
resolvedHostAddress = hostAddressOrOptions;
Expand All @@ -117,6 +128,7 @@ export class TaskHubGrpcClient {
this._stub = new GrpcClient(resolvedHostAddress, resolvedOptions, resolvedUseTLS, resolvedCredentials).stub;
this._metadataGenerator = resolvedMetadataGenerator;
this._logger = resolvedLogger ?? new ConsoleLogger();
this._defaultVersion = resolvedDefaultVersion;
}

async stop(): Promise<void> {
Expand Down Expand Up @@ -177,6 +189,13 @@ export class TaskHubGrpcClient {
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? undefined
: instanceIdOrOptions.tags;
const version =
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? undefined
: instanceIdOrOptions.version;

// Use provided version, or fall back to client's default version
const effectiveVersion = version ?? this._defaultVersion;

const req = new pb.CreateInstanceRequest();
req.setName(name);
Expand All @@ -191,9 +210,15 @@ export class TaskHubGrpcClient {
req.setInput(i);
req.setScheduledstarttimestamp(ts);

if (effectiveVersion) {
const v = new StringValue();
v.setValue(effectiveVersion);
req.setVersion(v);
}

populateTagsMap(req.getTagsMap(), tags);

this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${effectiveVersion ? ` (version: ${effectiveVersion})` : ""}`);

const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
this._stub.startInstance.bind(this._stub),
Expand Down Expand Up @@ -364,18 +389,49 @@ export class TaskHubGrpcClient {
* Terminates the orchestrator associated with the provided instance id.
*
* @param {string} instanceId - orchestrator instance id to terminate.
* @param {any} output - The optional output to set for the terminated orchestrator instance.
* @param {any | TerminateInstanceOptions} outputOrOptions - The optional output to set for the terminated orchestrator instance,
* or a TerminateInstanceOptions object created with `terminateOptions()` that can include both
* output and recursive termination settings.
*
* @example
* ```typescript
* // Simple termination with output
* await client.terminateOrchestration(instanceId, { reason: "cancelled" });
*
* // Recursive termination with options (use terminateOptions helper)
* import { terminateOptions } from "@microsoft/durabletask-js";
* await client.terminateOrchestration(instanceId, terminateOptions({
* output: { reason: "cancelled" },
* recursive: true
* }));
* ```
*/
async terminateOrchestration(instanceId: string, output: any = null): Promise<void> {
async terminateOrchestration(
instanceId: string,
outputOrOptions: any | TerminateInstanceOptions = null,
): Promise<void> {
const req = new pb.TerminateRequest();
req.setInstanceid(instanceId);

let output: any = null;
let recursive = false;

// Use type guard to safely detect TerminateInstanceOptions
// This avoids false positives when user output happens to have 'recursive' or 'output' properties
if (isTerminateInstanceOptions(outputOrOptions)) {
output = outputOrOptions.output ?? null;
recursive = outputOrOptions.recursive ?? false;
} else {
output = outputOrOptions;
}

const i = new StringValue();
i.setValue(JSON.stringify(output));

req.setOutput(i);
req.setRecursive(recursive);

this._logger.info(`Terminating '${instanceId}'`);
this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`);

await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
this._stub.terminateInstance.bind(this._stub),
Expand Down Expand Up @@ -537,16 +593,21 @@ export class TaskHubGrpcClient {
*
* @param value - The unique ID of the orchestration instance to purge or orchestration instance filter criteria used
* to determine which instances to purge.
* @param options - Optional options to control the purge behavior, such as recursive purging of sub-orchestrations.
* @returns A Promise that resolves to a {@link PurgeResult} or `undefined` if the purge operation was not successful.
*/
async purgeOrchestration(value: string | PurgeInstanceCriteria): Promise<PurgeResult | undefined> {
async purgeOrchestration(
value: string | PurgeInstanceCriteria,
options?: PurgeInstanceOptions,
): Promise<PurgeResult | undefined> {
let res;
if (typeof value === `string`) {
if (typeof value === "string") {
const instanceId = value;
const req = new pb.PurgeInstancesRequest();
req.setInstanceid(instanceId);
req.setRecursive(options?.recursive ?? false);

this._logger.info(`Purging Instance '${instanceId}'`);
this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`);

res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
this._stub.purgeInstances.bind(this._stub),
Expand Down Expand Up @@ -574,9 +635,10 @@ export class TaskHubGrpcClient {
filter.addRuntimestatus(toProtobuf(status));
}
req.setPurgeinstancefilter(filter);
req.setRecursive(options?.recursive ?? false);
const timeout = purgeInstanceCriteria.getTimeout();

this._logger.info("Purging Instance using purging criteria");
this._logger.info(`Purging Instances using purging criteria${options?.recursive ? " (recursive)" : ""}`);

const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
this._stub.purgeInstances.bind(this._stub),
Expand Down
7 changes: 7 additions & 0 deletions packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
// Client and Worker
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options";

// Contexts
export { OrchestrationContext } from "./task/context/orchestration-context";
export { ActivityContext } from "./task/context/activity-context";

// Orchestration types and utilities
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
export { PurgeInstanceOptions } from "./orchestration/orchestration-purge-options";
export { TerminateInstanceOptions, terminateOptions, isTerminateInstanceOptions, TERMINATE_OPTIONS_SYMBOL } from "./orchestration/orchestration-terminate-options";
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
export { OrchestrationState } from "./orchestration/orchestration-state";

Expand Down Expand Up @@ -82,3 +85,7 @@ export { ParentOrchestrationInstance } from "./types/parent-orchestration-instan

// Logger
export { Logger, ConsoleLogger, NoOpLogger } from "./types/logger.type";
export { ReplaySafeLogger, ReplayContext } from "./types/replay-safe-logger";

// Versioning utilities
export { compareVersions } from "./utils/versioning.util";
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Options for purging orchestration instances.
*/
export interface PurgeInstanceOptions {
/**
* Whether to recursively purge sub-orchestrations as well.
* When true, all child orchestrations spawned by the target orchestration
* will also be purged.
*
* Note: Recursive purging may not be supported by all backend implementations.
*
* @default false
*/
recursive?: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Symbol used to identify TerminateInstanceOptions objects.
* This prevents confusion when user output happens to have 'recursive' or 'output' properties.
*/
export const TERMINATE_OPTIONS_SYMBOL = Symbol.for("durabletask.TerminateInstanceOptions");

/**
* Options for terminating orchestration instances.
*/
export interface TerminateInstanceOptions {
/**
* Internal marker to identify this as a TerminateInstanceOptions object.
* @internal
*/
readonly [TERMINATE_OPTIONS_SYMBOL]?: true;

/**
* Whether to recursively terminate sub-orchestrations as well.
* When true, all child orchestrations spawned by the target orchestration
* will also be terminated.
*
* @default false
*/
recursive?: boolean;

/**
* The optional output to set for the terminated orchestrator instance.
*/
output?: any;
}

/**
* Creates a TerminateInstanceOptions object with proper type identification.
* Use this function instead of creating plain objects to ensure correct behavior.
*
* @param options - The terminate options
* @returns A properly typed TerminateInstanceOptions object
*
* @example
* ```typescript
* // Terminate with recursive option
* await client.terminateOrchestration(instanceId, terminateOptions({ recursive: true }));
*
* // Terminate with output and recursive
* await client.terminateOrchestration(instanceId, terminateOptions({
* output: { reason: "cancelled by user" },
* recursive: true
* }));
* ```
*/
export function terminateOptions(options: Omit<TerminateInstanceOptions, typeof TERMINATE_OPTIONS_SYMBOL>): TerminateInstanceOptions {
return {
...options,
[TERMINATE_OPTIONS_SYMBOL]: true as const,
};
}

/**
* Type guard to check if a value is a TerminateInstanceOptions object.
* @internal
*/
export function isTerminateInstanceOptions(value: unknown): value is TerminateInstanceOptions {
return (
value !== null &&
typeof value === "object" &&
TERMINATE_OPTIONS_SYMBOL in value &&
(value as TerminateInstanceOptions)[TERMINATE_OPTIONS_SYMBOL] === true
);
}
Loading
Loading