diff --git a/.changeset/fix-hook-loop-unconsumed-event.md b/.changeset/fix-hook-loop-unconsumed-event.md new file mode 100644 index 0000000000..5708946891 --- /dev/null +++ b/.changeset/fix-hook-loop-unconsumed-event.md @@ -0,0 +1,5 @@ +--- +'@workflow/core': patch +--- + +Fix false-positive unconsumed `step_created` errors when replay resumes a `for await` hook loop and appends more async work after the first promise-queue drain. diff --git a/packages/core/src/events-consumer.ts b/packages/core/src/events-consumer.ts index 1786cf21df..683d67888b 100644 --- a/packages/core/src/events-consumer.ts +++ b/packages/core/src/events-consumer.ts @@ -114,23 +114,32 @@ export class EventsConsumer { // is still unconsumed after the queue drains, it's truly orphaned. if (currentEvent !== null) { const checkVersion = ++this.unconsumedCheckVersion; - this.pendingUnconsumedCheck = this.getPromiseQueue().then(() => { - // Use a delayed setTimeout after the queue drains. The delay must be - // long enough for promise chains to propagate across the VM boundary - // (from resolve() in the host context through to the workflow code - // calling subscribe() in the VM context). Node.js does not guarantee - // that setTimeout(0) fires after all cross-context microtasks settle, - // so we use a small but non-zero delay. Any subscribe() call that - // arrives during this window will cancel the check via version - // invalidation + clearTimeout. - this.pendingUnconsumedTimeout = setTimeout(() => { - this.pendingUnconsumedTimeout = null; - if (this.unconsumedCheckVersion === checkVersion) { - this.pendingUnconsumedCheck = null; - this.onUnconsumedEvent(currentEvent); - } - }, 100); - }); + this.pendingUnconsumedCheck = this.getPromiseQueue() + .then( + // Yield once after the first queue drain so promise chains resumed by + // that drain can run across the VM boundary and append any follow-up + // async work (for example: step_completed resolves -> for-await loop + // resumes -> the next hook payload starts hydrating). + () => new Promise((resolve) => setTimeout(resolve, 0)) + ) + .then(() => this.getPromiseQueue()) + .then(() => { + // Use a delayed setTimeout after the queue drains. The delay must be + // long enough for promise chains to propagate across the VM boundary + // (from resolve() in the host context through to the workflow code + // calling subscribe() in the VM context). Node.js does not guarantee + // that setTimeout(0) fires after all cross-context microtasks settle, + // so we use a small but non-zero delay. Any subscribe() call that + // arrives during this window will cancel the check via version + // invalidation + clearTimeout. + this.pendingUnconsumedTimeout = setTimeout(() => { + this.pendingUnconsumedTimeout = null; + if (this.unconsumedCheckVersion === checkVersion) { + this.pendingUnconsumedCheck = null; + this.onUnconsumedEvent(currentEvent); + } + }, 100); + }); } }; } diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index 65cfe6b20b..eca37555c8 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -4145,6 +4145,194 @@ describe('runWorkflow', () => { } }); + it('should not orphan the second step_created in a for-await hook loop when the next payload hydration is delayed', async () => { + const ops: Promise[] = []; + const workflowRunId = 'wrun_123'; + const workflowRun: WorkflowRun = { + runId: workflowRunId, + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + workflowRunId, + noEncryptionKey, + ops + ), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + + const payload1 = await dehydrateStepReturnValue( + { type: 'subscribe', id: 1 }, + workflowRunId, + noEncryptionKey, + ops + ); + const payload2 = await dehydrateStepReturnValue( + { type: 'done', done: true }, + workflowRunId, + noEncryptionKey, + ops + ); + const stepResult1 = await dehydrateStepReturnValue( + { processed: true, type: 'subscribe', id: 1 }, + workflowRunId, + noEncryptionKey, + ops + ); + const stepResult2 = await dehydrateStepReturnValue( + { processed: true, type: 'done' }, + workflowRunId, + noEncryptionKey, + ops + ); + + const events: Event[] = [ + { + eventId: 'evnt-run-created', + runId: workflowRunId, + eventType: 'run_created', + createdAt: new Date('2024-01-01T00:00:00.000Z'), + }, + { + eventId: 'evnt-run-started', + runId: workflowRunId, + eventType: 'run_started', + createdAt: new Date('2024-01-01T00:00:00.100Z'), + }, + { + eventId: 'evnt-hook-created', + runId: workflowRunId, + eventType: 'hook_created', + correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ', + eventData: { token: 'test-token' }, + createdAt: new Date('2024-01-01T00:00:00.200Z'), + }, + { + eventId: 'evnt-wait-created', + runId: workflowRunId, + eventType: 'wait_created', + correlationId: 'wait_01HK153X00SP082GGA0AAJC6PK', + eventData: { resumeAt: new Date('2024-01-02T00:00:00.000Z') }, + createdAt: new Date('2024-01-01T00:00:00.300Z'), + }, + { + eventId: 'evnt-hook-1', + runId: workflowRunId, + eventType: 'hook_received', + correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ', + eventData: { payload: payload1 }, + createdAt: new Date('2024-01-01T00:00:01.000Z'), + }, + { + eventId: 'evnt-step-1-created', + runId: workflowRunId, + eventType: 'step_created', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PM', + eventData: { stepName: 'processPayload', input: payload1 }, + createdAt: new Date('2024-01-01T00:00:01.100Z'), + }, + { + eventId: 'evnt-step-1-started', + runId: workflowRunId, + eventType: 'step_started', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PM', + createdAt: new Date('2024-01-01T00:00:01.200Z'), + }, + { + eventId: 'evnt-step-1-completed', + runId: workflowRunId, + eventType: 'step_completed', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PM', + eventData: { result: stepResult1 }, + createdAt: new Date('2024-01-01T00:00:01.300Z'), + }, + { + eventId: 'evnt-hook-2', + runId: workflowRunId, + eventType: 'hook_received', + correlationId: 'hook_01HK153X00SP082GGA0AAJC6PJ', + eventData: { payload: payload2 }, + createdAt: new Date('2024-01-01T00:00:02.000Z'), + }, + { + eventId: 'evnt-step-2-created', + runId: workflowRunId, + eventType: 'step_created', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PN', + eventData: { stepName: 'processPayload', input: payload2 }, + createdAt: new Date('2024-01-01T00:00:02.100Z'), + }, + { + eventId: 'evnt-step-2-started', + runId: workflowRunId, + eventType: 'step_started', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PN', + createdAt: new Date('2024-01-01T00:00:02.200Z'), + }, + { + eventId: 'evnt-step-2-completed', + runId: workflowRunId, + eventType: 'step_completed', + correlationId: 'step_01HK153X00SP082GGA0AAJC6PN', + eventData: { result: stepResult2 }, + createdAt: new Date('2024-01-01T00:00:02.300Z'), + }, + ]; + + const serialization = await import('./serialization.js'); + const originalHydrate = serialization.hydrateStepReturnValue; + let callCount = 0; + const spy = vi + .spyOn(serialization, 'hydrateStepReturnValue') + .mockImplementation(async (...args) => { + callCount++; + const delay = [5, 5, 150, 5][callCount - 1] ?? 5; + await new Promise((resolve) => setTimeout(resolve, delay)); + return originalHydrate(...args); + }); + + try { + const result = await runWorkflow( + `const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; + const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; + const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload"); + async function workflow() { + const hook = createHook({ token: 'test-token' }); + void sleep('1d'); + const results = []; + for await (const payload of hook) { + const processed = await processPayload(payload); + results.push(processed); + if (payload.done) { + break; + } + } + return results; + }${getWorkflowTransformCode('workflow')}`, + workflowRun, + events, + noEncryptionKey + ); + + expect(result).not.toBeInstanceOf(Error); + const hydrated = await hydrateWorkflowReturnValue( + result, + workflowRunId, + noEncryptionKey, + ops + ); + expect(hydrated).toEqual([ + { processed: true, type: 'subscribe', id: 1 }, + { processed: true, type: 'done' }, + ]); + } finally { + spy.mockRestore(); + } + }); + it('should not trigger unconsumed event error for step_created with 3 sequential steps', async () => { // Extended version: 3 sequential steps to increase the chance of // the timing race manifesting. Each step_created immediately follows