Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6e37181
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 4, 2026
6264999
Refactor web-shared to separate UI components from server concerns
karthikscale3 Feb 4, 2026
f2bf157
add support for streams
karthikscale3 Feb 4, 2026
3126f91
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 4, 2026
2667ebd
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 5, 2026
3c953df
update lock file
karthikscale3 Feb 5, 2026
077c1e6
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 5, 2026
0dd91f4
Migrate helpers from web-shared to core
karthikscale3 Feb 5, 2026
d3c3036
Migrate helpers from web-shared to core
karthikscale3 Feb 5, 2026
55732aa
Migrate helpers from web-shared to core
karthikscale3 Feb 5, 2026
2177860
Migrate helpers from web-shared to core
karthikscale3 Feb 5, 2026
b0953cc
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 6, 2026
1803ee2
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 6, 2026
bdae698
Fix claude's comments
karthikscale3 Feb 6, 2026
deddc66
Fix server actions bug for web
karthikscale3 Feb 6, 2026
7268d48
fix copilot bugs
karthikscale3 Feb 6, 2026
a767477
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 6, 2026
84e167e
Merge branch 'main' of github.com:vercel/workflow into karthik/expose…
karthikscale3 Feb 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/common-mangos-bet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/web-shared": patch
"@workflow/core": patch
"@workflow/web": patch
---

Added subpatch exports for runtime modules to allow direct imports in core. Refactored web-shared to be a thin package that exported UI components and world-actions. Updated web package to consume the UI components and world-actions from web-shared.
41 changes: 35 additions & 6 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"main": "dist/index.js",
"files": [
"dist",
"docs/**/*"
"docs/**/*",
"runtime.js",
"runtime.d.ts"
],
"directories": {
"doc": "./docs"
Expand All @@ -26,11 +28,38 @@
"workflow": "./dist/workflow/index.js",
"default": "./dist/index.js"
},
"./runtime": "./dist/runtime.js",
"./private": "./dist/private.js",
"./class-serialization": "./dist/class-serialization.js",
"./builtins": "./dist/builtins.js",
"./serialization": "./dist/serialization.js",
"./runtime": {
"types": "./dist/runtime.d.ts",
"default": "./dist/runtime.js"
},
"./runtime/start": {
"types": "./dist/runtime/start.d.ts",
"default": "./dist/runtime/start.js"
},
"./runtime/helpers": {
"types": "./dist/runtime/helpers.d.ts",
"default": "./dist/runtime/helpers.js"
},
"./runtime/resume-hook": {
"types": "./dist/runtime/resume-hook.d.ts",
"default": "./dist/runtime/resume-hook.js"
},
"./private": {
"types": "./dist/private.d.ts",
"default": "./dist/private.js"
},
"./class-serialization": {
"types": "./dist/class-serialization.d.ts",
"default": "./dist/class-serialization.js"
},
"./builtins": {
"types": "./dist/builtins.d.ts",
"default": "./dist/builtins.js"
},
"./serialization": {
"types": "./dist/serialization.d.ts",
"default": "./dist/serialization.js"
},
"./observability": {
"types": "./dist/observability.d.ts",
"node": "./dist/observability.js",
Expand Down
1 change: 1 addition & 0 deletions packages/core/runtime.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './dist/runtime.d.ts';
1 change: 1 addition & 0 deletions packages/core/runtime.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './dist/runtime.js';
12 changes: 12 additions & 0 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ export {
Run,
type WorkflowReadableStreamOptions,
} from './runtime/run.js';
export {
cancelRun,
listStreams,
readStream,
recreateRunFromExisting,
reenqueueRun,
type ReadStreamOptions,
type RecreateRunOptions,
type StopSleepOptions,
type StopSleepResult,
wakeUpRun,
} from './runtime/runs.js';
export { type StartOptions, start } from './runtime/start.js';
export { stepEntrypoint } from './runtime/step-handler.js';
export {
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ import { getWorld } from './world.js';
/** Default timeout for health checks in milliseconds */
const DEFAULT_HEALTH_CHECK_TIMEOUT = 30_000;

/**
* Pattern for safe workflow names. Only allows alphanumeric characters,
* underscores, hyphens, dots, and forward slashes (for namespaced workflows).
*/
const SAFE_WORKFLOW_NAME_PATTERN = /^[a-zA-Z0-9_\-.\/]+$/;

/**
* Validates a workflow name and returns the corresponding queue name.
* Ensures the workflow name only contains safe characters before
* interpolating it into the queue name string.
*/
export function getWorkflowQueueName(workflowName: string): ValidQueueName {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

if (!SAFE_WORKFLOW_NAME_PATTERN.test(workflowName)) {
throw new Error(
`Invalid workflow name "${workflowName}": must only contain alphanumeric characters, underscores, hyphens, dots, or forward slashes`
);
}
return `__wkf_workflow_${workflowName}` as ValidQueueName;
}

const generateId = monotonicFactory();

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WEBHOOK_RESPONSE_WRITABLE } from '../symbols.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { getSpanContextForTraceCarrier, trace } from '../telemetry.js';
import { waitedUntil } from '../util.js';
import { getWorkflowQueueName } from './helpers.js';
import { getWorld } from './world.js';

/**
Expand Down Expand Up @@ -130,7 +131,7 @@ export async function resumeHook<T = any>(
// Re-trigger the workflow against the deployment ID associated
// with the workflow run that the hook belongs to
await world.queue(
`__wkf_workflow_${workflowRun.workflowName}`,
getWorkflowQueueName(workflowRun.workflowName),
{
runId: hook.runId,
// attach the trace carrier from the workflow run
Expand Down
255 changes: 255 additions & 0 deletions packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import { hydrateWorkflowArguments } from '../serialization.js';
import {
type Event,
isLegacySpecVersion,
SPEC_VERSION_LEGACY,
type World,
} from '@workflow/world';
import { getWorkflowQueueName } from './helpers.js';
import { start } from './start.js';

export interface RecreateRunOptions {
deploymentId?: string;
specVersion?: number;
}

export interface StopSleepResult {
/** Number of pending sleeps that were stopped */
stoppedCount: number;
}

export interface ReadStreamOptions {
/**
* The index to start reading from. Defaults to 0.
*/
startIndex?: number;
}

export interface StopSleepOptions {
/**
* Optional list of specific correlation IDs to target.
* If provided, only these sleep calls will be interrupted.
* If not provided, all pending sleep calls will be interrupted.
*/
correlationIds?: string[];
}

const normalizeWorkflowArgs = (args: unknown): unknown[] => {
return Array.isArray(args) ? args : [args];
};

/**
* Start a new workflow run based on an existing run.
*/
export async function recreateRunFromExisting(
Comment thread
karthikscale3 marked this conversation as resolved.
world: World,
runId: string,
options: RecreateRunOptions = {}
): Promise<string> {
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const workflowArgs = normalizeWorkflowArgs(
hydrateWorkflowArguments(run.input, globalThis)
);
const specVersion =
options.specVersion ?? run.specVersion ?? SPEC_VERSION_LEGACY;
const deploymentId = options.deploymentId ?? run.deploymentId;

const newRun = await start(
{ workflowId: run.workflowName },
workflowArgs as unknown[],
{
deploymentId,
world,
specVersion,
}
);
return newRun.runId;
} catch (err) {
throw new Error(
`Failed to recreate run from ${runId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}

/**
* Cancel a workflow run.
*/
export async function cancelRun(world: World, runId: string): Promise<void> {
try {
const run = await world.runs.get(runId, { resolveData: 'none' });
const specVersion = run.specVersion ?? SPEC_VERSION_LEGACY;
const compatMode = isLegacySpecVersion(specVersion);
const eventData = {
eventType: 'run_cancelled' as const,
specVersion,
};
await world.events.create(runId, eventData, { v1Compat: compatMode });
} catch (err) {
throw new Error(
`Failed to cancel run ${runId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}

/**
* Re-enqueue a workflow run.
*/
export async function reenqueueRun(world: World, runId: string): Promise<void> {
try {
const run = await world.runs.get(runId, { resolveData: 'none' });
await world.queue(
getWorkflowQueueName(run.workflowName),
{
runId,
},
{
deploymentId: run.deploymentId,
}
);
} catch (err) {
throw new Error(
`Failed to re-enqueue run ${runId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}

/**
* Wake up a workflow run by interrupting pending sleep() calls.
*/
export async function wakeUpRun(
world: World,
runId: string,
options?: StopSleepOptions
): Promise<StopSleepResult> {
try {
const run = await world.runs.get(runId, { resolveData: 'none' });
const compatMode = isLegacySpecVersion(run.specVersion);

// Paginate through all events to ensure we don't miss any sleeps
// in long-running workflows with more than 1000 events.
const allEvents: Event[] = [];
let cursor: string | null = null;
do {
const eventsResult = await world.events.list({
runId,
pagination: { limit: 1000, ...(cursor ? { cursor } : {}) },
resolveData: 'none',
});
allEvents.push(...eventsResult.data);
cursor = eventsResult.hasMore ? eventsResult.cursor : null;
} while (cursor);

const waitCreatedEvents = allEvents.filter(
(event: Event) => event.eventType === 'wait_created'
);
const waitCompletedCorrelationIds = new Set(
allEvents
.filter((event: Event) => event.eventType === 'wait_completed')
.map((event: Event) => event.correlationId)
);

let pendingWaits = waitCreatedEvents.filter(
(event: Event) => !waitCompletedCorrelationIds.has(event.correlationId)
);

if (options?.correlationIds && options.correlationIds.length > 0) {
const targetCorrelationIds = new Set(options.correlationIds);
pendingWaits = pendingWaits.filter(
(event: Event) =>
event.correlationId && targetCorrelationIds.has(event.correlationId)
);
}

const errors: Error[] = [];
let stoppedCount = 0;

for (const waitEvent of pendingWaits) {
if (!waitEvent.correlationId) continue;
const eventData = compatMode
? {
eventType: 'wait_completed' as const,
correlationId: waitEvent.correlationId,
}
: {
eventType: 'wait_completed' as const,
correlationId: waitEvent.correlationId,
specVersion: run.specVersion,
};
try {
await world.events.create(runId, eventData, { v1Compat: compatMode });
stoppedCount++;
} catch (err) {
errors.push(err instanceof Error ? err : new Error(String(err)));
}
}

if (stoppedCount > 0) {
await world.queue(
getWorkflowQueueName(run.workflowName),
{
runId,
},
{
deploymentId: run.deploymentId,
}
);
}

if (errors.length > 0) {
throw new AggregateError(
errors,
`Failed to complete ${errors.length}/${pendingWaits.length} pending wait(s) for run ${runId}`
);
}

return { stoppedCount };
} catch (err) {
if (err instanceof AggregateError) {
throw err;
}
throw new Error(
`Failed to wake up run ${runId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}

/**
* Read from a stream by stream ID.
* Returns a ReadableStream of Uint8Array chunks.
*/
export async function readStream(
world: World,
streamId: string,
options?: ReadStreamOptions
): Promise<ReadableStream<Uint8Array>> {
try {
return await world.readFromStream(streamId, options?.startIndex);
} catch (err) {
throw new Error(
`Failed to read stream ${streamId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}

/**
* List all stream IDs for a workflow run.
*/
export async function listStreams(
world: World,
runId: string
): Promise<string[]> {
try {
return await world.listStreamsByRunId(runId);
} catch (err) {
throw new Error(
`Failed to list streams for run ${runId}: ${err instanceof Error ? err.message : String(err)}`,
{ cause: err }
);
}
}
Loading
Loading