Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2652769
fix(core): chain unconsumed event check onto promiseQueue to prevent …
TooTallNate Mar 4, 2026
8b468ed
fix: chain hydrateWorkflowArguments onto promiseQueue to prevent fals…
TooTallNate Mar 4, 2026
8ac1292
fix: use setTimeout(0) macrotask for unconsumed check to ensure VM pr…
TooTallNate Mar 4, 2026
afe7ee6
fix: increase unconsumed event check delay to 100ms for cross-VM prom…
TooTallNate Mar 4, 2026
82f0c90
Add browser-compatible AES-GCM to core and HKDF key derivation to wor…
TooTallNate Feb 6, 2026
5966a61
update changeset
TooTallNate Feb 14, 2026
b73b39e
Move HKDF key derivation server-side: API returns per-run derived key
TooTallNate Feb 15, 2026
ccc1173
Refactor encrypt/decrypt to accept CryptoKey, export importKey for ca…
TooTallNate Feb 18, 2026
46f6e72
Overload getEncryptionKeyForRun: accept context for start(), fetch Wo…
TooTallNate Feb 19, 2026
4c7241c
Split changeset into per-package descriptions for world, world-vercel…
TooTallNate Feb 19, 2026
6966eb8
Remove unnecessary Uint8Array.from() wrapper around Buffer.from()
TooTallNate Feb 19, 2026
90c06d4
Use zod to parse Vercel API response
TooTallNate Feb 19, 2026
458418a
Wire encryption into serialization layer
TooTallNate Feb 6, 2026
07d4f88
Wire AES-GCM encryption into serialization layer
TooTallNate Feb 10, 2026
f44a55a
update changeset
TooTallNate Feb 14, 2026
ed7663f
Add encryption unit tests: primitives, maybeEncrypt/maybeDecrypt, isE…
TooTallNate Feb 18, 2026
2cfa4f9
Accept CryptoKey in encrypt/decrypt, export importKey for callers to …
TooTallNate Feb 18, 2026
cff0001
Fix review comments: cache stream encryption key, remove redundant ca…
TooTallNate Feb 18, 2026
8673a8d
Trying to clean up some type non-sense
TooTallNate Feb 18, 2026
ec40d9c
fix: restore world-vercel files to main versions
TooTallNate Mar 3, 2026
854cafe
fix: add type cast for hydrateStepReturnValue return in hook.ts
TooTallNate Mar 3, 2026
38bebbe
fix: address review feedback on encryption PR
TooTallNate Mar 3, 2026
5938837
Revert more unnecessary changes
TooTallNate Mar 3, 2026
d6ffeb8
cleanup: remove unused runId param, deduplicate processFrames, add le…
TooTallNate Mar 3, 2026
68c1d98
feat: wire cryptoKey through stream serialize/deserialize pipeline
TooTallNate Mar 3, 2026
904df2b
fix: make cryptoKey required-but-nullable to prevent silent omission,…
TooTallNate Mar 4, 2026
09dd6fe
fix: keep Run#getReadable() sync, resolve encryption key lazily in st…
TooTallNate Mar 4, 2026
ba03041
.
TooTallNate Mar 4, 2026
bc01397
fix: address review feedback from PR #1251
TooTallNate Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/e2e-encryption.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Wire AES-GCM encryption into serialization layer with stream support
26 changes: 22 additions & 4 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { importKey } from '@workflow/core/encryption';
import {
type EncryptionKeyParam,
getDeserializeStream,
getExternalRevivers,
} from '@workflow/core/serialization';
Expand Down Expand Up @@ -780,16 +782,32 @@ export const showStream = async (
streamId: string,
opts: InspectCLIOptions = {}
) => {
if (opts.runId || opts.stepId) {
if (opts.stepId) {
logger.warn(
'Filtering by run-id or step-id is not supported when showing a stream, ignoring filter.'
'Filtering by step-id is not supported when showing a stream, ignoring filter.'
);
}
const rawStream = await world.readFromStream(streamId);

// Resolve the encryption key if a runId is provided (needed for encrypted streams).
Comment thread
TooTallNate marked this conversation as resolved.
// The key is passed as a promise so stream construction is synchronous —
// it will be resolved lazily on the first encrypted frame.
let encryptionKey: EncryptionKeyParam;
if (opts.runId) {
encryptionKey = (async () => {
const rawKey = await world.getEncryptionKeyForRun?.(opts.runId!);
return rawKey ? await importKey(rawKey) : undefined;
})();
}

// Deserialize the stream to get JavaScript objects
const revivers = getExternalRevivers(globalThis, [], '');
const transform = getDeserializeStream(revivers);
const revivers = getExternalRevivers(
globalThis,
[],
opts.runId ?? '',
encryptionKey
);
const transform = getDeserializeStream(revivers, encryptionKey);
const stream = rawStream.pipeThrough(transform);

logger.info('Streaming to stdout, press CTRL+C to abort.');
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events, {
onUnconsumedEvent: () => {},
getPromiseQueue: () => Promise.resolve(),
}),
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
Expand Down
46 changes: 33 additions & 13 deletions packages/core/src/events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ function createMockEvent(overrides: Partial<Event> = {}): Event {
}

// Default options for tests that don't care about onUnconsumedEvent
const defaultOptions = { onUnconsumedEvent: vi.fn() };
const defaultOptions = {
onUnconsumedEvent: vi.fn(),
getPromiseQueue: () => Promise.resolve(),
};

// Helper function to wait for next tick
function waitForNextTick(): Promise<void> {
return new Promise((resolve) => process.nextTick(resolve));
}

// Helper to wait for setTimeout(0) macrotask (used by deferred unconsumed event check)
function waitForMacrotask(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
// Helper to wait for the unconsumed event check (promiseQueue .then() + setTimeout(100ms))
function waitForUnconsumedCheck(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 150));
}

describe('EventsConsumer', () => {
Expand Down Expand Up @@ -159,7 +162,10 @@ describe('EventsConsumer', () => {
it('should process all callbacks when none return true and call onUnconsumedEvent', async () => {
const event = createMockEvent();
const onUnconsumedEvent = vi.fn();
const consumer = new EventsConsumer([event], { onUnconsumedEvent });
const consumer = new EventsConsumer([event], {
onUnconsumedEvent,
getPromiseQueue: () => Promise.resolve(),
});
const callback1 = vi
.fn()
.mockReturnValue(EventConsumerResult.NotConsumed);
Expand All @@ -181,8 +187,9 @@ describe('EventsConsumer', () => {
expect(consumer.eventIndex).toBe(0);
expect(consumer.callbacks).toEqual([callback1, callback2, callback3]);

// onUnconsumedEvent is deferred via setTimeout to allow new subscribes
await waitForMacrotask();
// onUnconsumedEvent is deferred via promise queue .then() + setTimeout(0)
await waitForNextTick();
await waitForUnconsumedCheck();
expect(onUnconsumedEvent).toHaveBeenCalledWith(event);
});

Expand Down Expand Up @@ -352,24 +359,33 @@ describe('EventsConsumer', () => {
it('should call onUnconsumedEvent when a non-null event is not consumed by any callback', async () => {
const event = createMockEvent();
const onUnconsumedEvent = vi.fn();
const consumer = new EventsConsumer([event], { onUnconsumedEvent });
const consumer = new EventsConsumer([event], {
onUnconsumedEvent,
getPromiseQueue: () => Promise.resolve(),
});
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);

consumer.subscribe(callback);
// Wait for: nextTick(consume) → .then() microtask → setTimeout(0) macrotask
await waitForNextTick();
await waitForNextTick();
await waitForMacrotask();
await waitForUnconsumedCheck();

expect(onUnconsumedEvent).toHaveBeenCalledWith(event);
});

it('should NOT call onUnconsumedEvent for null event (end-of-events)', async () => {
const onUnconsumedEvent = vi.fn();
const consumer = new EventsConsumer([], { onUnconsumedEvent });
const consumer = new EventsConsumer([], {
onUnconsumedEvent,
getPromiseQueue: () => Promise.resolve(),
});
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);

consumer.subscribe(callback);
await waitForNextTick();
await waitForMacrotask();
await waitForNextTick();
await waitForUnconsumedCheck();

expect(callback).toHaveBeenCalledWith(null);
expect(onUnconsumedEvent).not.toHaveBeenCalled();
Expand All @@ -378,7 +394,10 @@ describe('EventsConsumer', () => {
it('should cancel pending unconsumed check when a new callback subscribes', async () => {
const event = createMockEvent();
const onUnconsumedEvent = vi.fn();
const consumer = new EventsConsumer([event], { onUnconsumedEvent });
const consumer = new EventsConsumer([event], {
onUnconsumedEvent,
getPromiseQueue: () => Promise.resolve(),
});
const callback1 = vi
.fn()
.mockReturnValue(EventConsumerResult.NotConsumed);
Expand All @@ -390,7 +409,8 @@ describe('EventsConsumer', () => {
const callback2 = vi.fn().mockReturnValue(EventConsumerResult.Finished);
consumer.subscribe(callback2);
await waitForNextTick();
await waitForMacrotask();
await waitForNextTick();
await waitForUnconsumedCheck();

// The new callback consumed the event, so onUnconsumedEvent should NOT be called
expect(onUnconsumedEvent).not.toHaveBeenCalled();
Expand Down
63 changes: 46 additions & 17 deletions packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,36 @@ type EventConsumerCallback = (event: Event | null) => EventConsumerResult;
export interface EventsConsumerOptions {
/**
* Callback invoked when a non-null event cannot be consumed by any registered
* callback, indicating an orphaned or invalid event in the event log. Called
* asynchronously after a macrotask delay to allow pending callback
* subscriptions to settle first.
* callback, indicating an orphaned or invalid event in the event log. The
* check is deferred until after the promise queue has drained, ensuring that
* any pending async work (e.g., deserialization/decryption) completes and
* downstream subscribe() calls have a chance to cancel the check first.
*/
onUnconsumedEvent: (event: Event) => void;
/**
* Returns the current promise queue. The unconsumed event check is chained
* onto this queue so it only fires after all pending async work (e.g.,
* deserialization) has completed. This prevents false positives when async
* deserialization delays the resolve() that triggers the next subscribe().
*/
getPromiseQueue: () => Promise<void>;
}

export class EventsConsumer {
eventIndex: number;
readonly events: Event[] = [];
readonly callbacks: EventConsumerCallback[] = [];
private onUnconsumedEvent: (event: Event) => void;
private pendingUnconsumedCheck: ReturnType<typeof setTimeout> | null = null;
private getPromiseQueue: () => Promise<void>;
private pendingUnconsumedCheck: Promise<void> | null = null;
private pendingUnconsumedTimeout: ReturnType<typeof setTimeout> | null = null;
private unconsumedCheckVersion = 0;

constructor(events: Event[], options: EventsConsumerOptions) {
this.events = events;
this.eventIndex = 0;
this.onUnconsumedEvent = options.onUnconsumedEvent;
this.getPromiseQueue = options.getPromiseQueue;
}

/**
Expand All @@ -52,10 +64,16 @@ export class EventsConsumer {
*/
subscribe(fn: EventConsumerCallback) {
this.callbacks.push(fn);
// Cancel any pending unconsumed check since a new callback may consume the event
// Cancel any pending unconsumed check since a new callback may consume the event.
// Incrementing the version causes any in-flight promise chain check to no-op.
// Also clear the pending setTimeout if it hasn't fired yet.
if (this.pendingUnconsumedCheck !== null) {
clearTimeout(this.pendingUnconsumedCheck);
this.unconsumedCheckVersion++;
this.pendingUnconsumedCheck = null;
if (this.pendingUnconsumedTimeout !== null) {
clearTimeout(this.pendingUnconsumedTimeout);
this.pendingUnconsumedTimeout = null;
}
}
process.nextTick(this.consume);
}
Expand Down Expand Up @@ -90,18 +108,29 @@ export class EventsConsumer {

// If we reach here, all callbacks returned NotConsumed.
// If the current event is non-null (a real event, not end-of-events),
// schedule a deferred check. We use setTimeout (macrotask) so that any
// pending process.nextTick microtasks (e.g., new subscribes from the
// workflow code) can complete first. If the event is still unconsumed
// when the timeout fires, it's truly orphaned.
// schedule a deferred check. We chain onto the promiseQueue so that any
// pending async work (e.g., deserialization/decryption that triggers
// resolve() → user code → subscribe()) completes first. If the event
// is still unconsumed after the queue drains, it's truly orphaned.
if (currentEvent !== null) {
const unconsumedIndex = this.eventIndex;
this.pendingUnconsumedCheck = setTimeout(() => {
this.pendingUnconsumedCheck = null;
if (this.eventIndex === unconsumedIndex) {
this.onUnconsumedEvent(currentEvent);
}
}, 0);
const checkVersion = ++this.unconsumedCheckVersion;
this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => {
// Use a delayed setTimeout after the queue drains. The delay must be
// long enough for promise chains to propagate across the VM boundary
// (from resolve() in the host context through to the workflow code
// calling subscribe() in the VM context). Node.js does not guarantee
// that setTimeout(0) fires after all cross-context microtasks settle,
// so we use a small but non-zero delay. Any subscribe() call that
// arrives during this window will cancel the check via version
// invalidation + clearTimeout.
this.pendingUnconsumedTimeout = setTimeout(() => {
this.pendingUnconsumedTimeout = null;
if (this.unconsumedCheckVersion === checkVersion) {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
});
}
};
}
39 changes: 35 additions & 4 deletions packages/core/src/runtime/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
type WorkflowRunStatus,
type World,
} from '@workflow/world';
import { importKey } from '../encryption.js';
import { type CryptoKey, importKey } from '../encryption.js';
import {
getExternalRevivers,
hydrateWorkflowReturnValue,
Expand Down Expand Up @@ -63,11 +63,35 @@ export class Run<TResult> {
*/
private world: World;

/**
* Cached encryption key resolution. Resolved once on first use and
* reused for returnValue, getReadable(), etc.
* @internal
*/
private encryptionKeyPromise: Promise<CryptoKey | undefined> | null = null;

constructor(runId: string) {
this.runId = runId;
this.world = getWorld();
}

/**
* Resolves and caches the encryption key for this run.
* The key is the same for the lifetime of a run, so it only needs
* to be resolved once.
* @internal
*/
private getEncryptionKey(): Promise<CryptoKey | undefined> {
if (!this.encryptionKeyPromise) {
this.encryptionKeyPromise = (async () => {
const run = await this.world.runs.get(this.runId);
const rawKey = await this.world.getEncryptionKeyForRun?.(run);
return rawKey ? await importKey(rawKey) : undefined;
})();
}
return this.encryptionKeyPromise;
}

/**
* Interrupts pending `sleep()` calls, resuming the workflow early.
*
Expand Down Expand Up @@ -153,7 +177,15 @@ export class Run<TResult> {
): ReadableStream<R> {
const { ops = [], global = globalThis, startIndex, namespace } = options;
const name = getWorkflowRunStreamId(this.runId, namespace);
return getExternalRevivers(global, ops, this.runId).ReadableStream({
// Pass the key as a promise — it will be resolved lazily inside
// the first async transform() call of the deserialize stream.
const encryptionKey = this.getEncryptionKey();
return getExternalRevivers(
global,
ops,
this.runId,
encryptionKey
).ReadableStream({
name,
startIndex,
}) as ReadableStream<R>;
Expand All @@ -170,8 +202,7 @@ export class Run<TResult> {
const run = await this.world.runs.get(this.runId);

if (run.status === 'completed') {
const rawKey = await this.world.getEncryptionKeyForRun?.(run);
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;
const encryptionKey = await this.getEncryptionKey();
return await hydrateWorkflowReturnValue(
run.output,
this.runId,
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
},
ops,
closureVars: hydratedInput.closureVars,
encryptionKey,
},
() => stepFn.apply(thisVal, args)
);
Expand Down
Loading
Loading