Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-hook-loop-unconsumed-event.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Fix false-positive unconsumed `step_created` errors when replay resumes a `for await` hook loop and appends more async work after the first promise-queue drain.
43 changes: 26 additions & 17 deletions packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,32 @@ export class EventsConsumer {
// is still unconsumed after the queue drains, it's truly orphaned.
if (currentEvent !== null) {
const checkVersion = ++this.unconsumedCheckVersion;
this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => {
// Use a delayed setTimeout after the queue drains. The delay must be
// long enough for promise chains to propagate across the VM boundary
// (from resolve() in the host context through to the workflow code
// calling subscribe() in the VM context). Node.js does not guarantee
// that setTimeout(0) fires after all cross-context microtasks settle,
// so we use a small but non-zero delay. Any subscribe() call that
// arrives during this window will cancel the check via version
// invalidation + clearTimeout.
this.pendingUnconsumedTimeout = setTimeout(() => {
this.pendingUnconsumedTimeout = null;
if (this.unconsumedCheckVersion === checkVersion) {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
});
this.pendingUnconsumedCheck = this.getPromiseQueue()
.then(
// Yield once after the first queue drain so promise chains resumed by
// that drain can run across the VM boundary and append any follow-up
// async work (for example: step_completed resolves -> for-await loop
// resumes -> the next hook payload starts hydrating).
() => new Promise<void>((resolve) => setTimeout(resolve, 0))
)
.then(() => this.getPromiseQueue())
.then(() => {
// Use a delayed setTimeout after the queue drains. The delay must be
// long enough for promise chains to propagate across the VM boundary
// (from resolve() in the host context through to the workflow code
// calling subscribe() in the VM context). Node.js does not guarantee
// that setTimeout(0) fires after all cross-context microtasks settle,
// so we use a small but non-zero delay. Any subscribe() call that
// arrives during this window will cancel the check via version
// invalidation + clearTimeout.
this.pendingUnconsumedTimeout = setTimeout(() => {
this.pendingUnconsumedTimeout = null;
if (this.unconsumedCheckVersion === checkVersion) {
this.pendingUnconsumedCheck = null;
this.onUnconsumedEvent(currentEvent);
}
}, 100);
});
}
};
}
188 changes: 188 additions & 0 deletions packages/core/src/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4145,6 +4145,194 @@ describe('runWorkflow', () => {
}
});

it('should not orphan the second step_created in a for-await hook loop when the next payload hydration is delayed', async () => {
const ops: Promise<any>[] = [];
const workflowRunId = 'wrun_123';
const workflowRun: WorkflowRun = {
runId: workflowRunId,
workflowName: 'workflow',
status: 'running',
input: await dehydrateWorkflowArguments(
[],
workflowRunId,
noEncryptionKey,
ops
),
createdAt: new Date('2024-01-01T00:00:00.000Z'),
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
startedAt: new Date('2024-01-01T00:00:00.000Z'),
deploymentId: 'test-deployment',
};

const payload1 = await dehydrateStepReturnValue(
{ type: 'subscribe', id: 1 },
workflowRunId,
noEncryptionKey,
ops
);
const payload2 = await dehydrateStepReturnValue(
{ type: 'done', done: true },
workflowRunId,
noEncryptionKey,
ops
);
const stepResult1 = await dehydrateStepReturnValue(
{ processed: true, type: 'subscribe', id: 1 },
workflowRunId,
noEncryptionKey,
ops
);
const stepResult2 = await dehydrateStepReturnValue(
{ processed: true, type: 'done' },
workflowRunId,
noEncryptionKey,
ops
);

const events: Event[] = [
{
eventId: 'evnt-run-created',
runId: workflowRunId,
eventType: 'run_created',
createdAt: new Date('2024-01-01T00:00:00.000Z'),
},
{
eventId: 'evnt-run-started',
runId: workflowRunId,
eventType: 'run_started',
createdAt: new Date('2024-01-01T00:00:00.100Z'),
},
{
eventId: 'evnt-hook-created',
runId: workflowRunId,
eventType: 'hook_created',
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
eventData: { token: 'test-token' },
createdAt: new Date('2024-01-01T00:00:00.200Z'),
},
{
eventId: 'evnt-wait-created',
runId: workflowRunId,
eventType: 'wait_created',
correlationId: 'wait_01HK153X00SP082GGA0AAJC6PK',
eventData: { resumeAt: new Date('2024-01-02T00:00:00.000Z') },
createdAt: new Date('2024-01-01T00:00:00.300Z'),
},
{
eventId: 'evnt-hook-1',
runId: workflowRunId,
eventType: 'hook_received',
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
eventData: { payload: payload1 },
createdAt: new Date('2024-01-01T00:00:01.000Z'),
},
{
eventId: 'evnt-step-1-created',
runId: workflowRunId,
eventType: 'step_created',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
eventData: { stepName: 'processPayload', input: payload1 },
createdAt: new Date('2024-01-01T00:00:01.100Z'),
},
{
eventId: 'evnt-step-1-started',
runId: workflowRunId,
eventType: 'step_started',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
createdAt: new Date('2024-01-01T00:00:01.200Z'),
},
{
eventId: 'evnt-step-1-completed',
runId: workflowRunId,
eventType: 'step_completed',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PM',
eventData: { result: stepResult1 },
createdAt: new Date('2024-01-01T00:00:01.300Z'),
},
{
eventId: 'evnt-hook-2',
runId: workflowRunId,
eventType: 'hook_received',
correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ',
eventData: { payload: payload2 },
createdAt: new Date('2024-01-01T00:00:02.000Z'),
},
{
eventId: 'evnt-step-2-created',
runId: workflowRunId,
eventType: 'step_created',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
eventData: { stepName: 'processPayload', input: payload2 },
createdAt: new Date('2024-01-01T00:00:02.100Z'),
},
{
eventId: 'evnt-step-2-started',
runId: workflowRunId,
eventType: 'step_started',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
createdAt: new Date('2024-01-01T00:00:02.200Z'),
},
{
eventId: 'evnt-step-2-completed',
runId: workflowRunId,
eventType: 'step_completed',
correlationId: 'step_01HK153X00SP082GGA0AAJC6PN',
eventData: { result: stepResult2 },
createdAt: new Date('2024-01-01T00:00:02.300Z'),
},
];

const serialization = await import('./serialization.js');
const originalHydrate = serialization.hydrateStepReturnValue;
let callCount = 0;
const spy = vi
.spyOn(serialization, 'hydrateStepReturnValue')
.mockImplementation(async (...args) => {
callCount++;
const delay = [5, 5, 150, 5][callCount - 1] ?? 5;
await new Promise((resolve) => setTimeout(resolve, delay));
return originalHydrate(...args);
});

try {
const result = await runWorkflow(
`const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")];
const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")];
const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload");
async function workflow() {
const hook = createHook({ token: 'test-token' });
void sleep('1d');
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void sleep('1d') creates a wait invocation that is never completed in the provided event log (only wait_created exists). This will leave a pending wait in invocationsQueue and triggers warnPendingQueueItems() on workflow completion, making the test noisy and potentially flaky if warnings are treated as failures. Consider removing the un-awaited sleep, or add a matching wait_completed event (and/or await the sleep) so the queue is clean at the end of the run.

Suggested change
void sleep('1d');

Copilot uses AI. Check for mistakes.
const results = [];
for await (const payload of hook) {
const processed = await processPayload(payload);
results.push(processed);
if (payload.done) {
break;
}
}
return results;
}${getWorkflowTransformCode('workflow')}`,
workflowRun,
events,
noEncryptionKey
);

expect(result).not.toBeInstanceOf(Error);
const hydrated = await hydrateWorkflowReturnValue(
result,
workflowRunId,
noEncryptionKey,
ops
);
expect(hydrated).toEqual([
{ processed: true, type: 'subscribe', id: 1 },
{ processed: true, type: 'done' },
]);
} finally {
spy.mockRestore();
}
});

it('should not trigger unconsumed event error for step_created with 3 sequential steps', async () => {
// Extended version: 3 sequential steps to increase the chance of
// the timing race manifesting. Each step_created immediately follows
Expand Down
Loading