Backport: fix false-positive unconsumed event in hook loop replay#1780
Backport: fix false-positive unconsumed event in hook loop replay#1780TooTallNate wants to merge 2 commits intostablefrom
Conversation
🦋 Changeset detectedLatest commit: f2f9dbc The changes in this PR will be included in the next version bump. This PR includes changesets to release 18 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests▲ Vercel Production (10 failed)astro (2 failed):
example (2 failed):
express (2 failed):
nitro (2 failed):
nuxt (1 failed):
sveltekit (1 failed):
📦 Local Production (1 failed)vite-stable (1 failed):
🌍 Community Worlds (68 failed)mongodb-dev (1 failed):
redis-dev (1 failed):
turso-dev (1 failed):
turso (65 failed):
Details by Category❌ ▲ Vercel Production
✅ 💻 Local Development
❌ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
VaguelySerious
left a comment
There was a problem hiding this comment.
LGTM assuming tests pass. Note that I've been seeing tons of flakes in stable lately, though
There was a problem hiding this comment.
Pull request overview
Backports a runtime replay fix to prevent false-positive “unconsumed event” errors (notably step_created) when a for await hook loop resumes and schedules additional async hydration work after an initial promise-queue drain.
Changes:
- Update
EventsConsumer’s deferred unconsumed-event check to re-check the latest promise queue after yielding once. - Add a
runWorkflowregression test covering delayed hydration in afor awaithook loop with sequential steps. - Add a patch changeset for
@workflow/core.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| packages/core/src/events-consumer.ts | Adjusts unconsumed-event detection timing to reduce replay race false positives. |
| packages/core/src/workflow.test.ts | Adds a regression test for for await hook-loop replay ordering with delayed hydration. |
| .changeset/fix-hook-loop-unconsumed-event.md | Declares a patch release note for the backported fix. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| try { | ||
| const result = await runWorkflow( | ||
| `const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; | ||
| const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; | ||
| const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload"); | ||
| async function workflow() { | ||
| const hook = createHook({ token: 'test-token' }); | ||
| void sleep('1d'); | ||
| const results = []; | ||
| for await (const payload of hook) { | ||
| const processed = await processPayload(payload); | ||
| results.push(processed); | ||
| if (payload.done) { | ||
| break; | ||
| } | ||
| } | ||
| return results; | ||
| }${getWorkflowTransformCode('workflow')}`, | ||
| workflowRun, | ||
| events, | ||
| noEncryptionKey | ||
| ); | ||
|
|
||
| expect(result).not.toBeInstanceOf(Error); | ||
| const hydrated = await hydrateWorkflowReturnValue( | ||
| result, | ||
| workflowRunId, | ||
| noEncryptionKey, | ||
| ops | ||
| ); | ||
| expect(hydrated).toEqual([ | ||
| { processed: true, type: 'subscribe', id: 1 }, | ||
| { processed: true, type: 'done' }, | ||
| ]); |
There was a problem hiding this comment.
This test intentionally calls sleep('1d') without awaiting it. runWorkflow() will warn on completion when there are pending operations (including waits), which will emit a console.warn via runtimeLogger.warn and can create noisy/flaky test output. Consider either removing the sleep/wait_created from this scenario, or explicitly spying on console.warn (as done in the existing “pending queue warnings” tests) to assert/suppress the warning.
| try { | |
| const result = await runWorkflow( | |
| `const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; | |
| const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; | |
| const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload"); | |
| async function workflow() { | |
| const hook = createHook({ token: 'test-token' }); | |
| void sleep('1d'); | |
| const results = []; | |
| for await (const payload of hook) { | |
| const processed = await processPayload(payload); | |
| results.push(processed); | |
| if (payload.done) { | |
| break; | |
| } | |
| } | |
| return results; | |
| }${getWorkflowTransformCode('workflow')}`, | |
| workflowRun, | |
| events, | |
| noEncryptionKey | |
| ); | |
| expect(result).not.toBeInstanceOf(Error); | |
| const hydrated = await hydrateWorkflowReturnValue( | |
| result, | |
| workflowRunId, | |
| noEncryptionKey, | |
| ops | |
| ); | |
| expect(hydrated).toEqual([ | |
| { processed: true, type: 'subscribe', id: 1 }, | |
| { processed: true, type: 'done' }, | |
| ]); | |
| const consoleWarnSpy = vi | |
| .spyOn(console, 'warn') | |
| .mockImplementation(() => {}); | |
| try { | |
| try { | |
| const result = await runWorkflow( | |
| `const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; | |
| const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; | |
| const processPayload = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("processPayload"); | |
| async function workflow() { | |
| const hook = createHook({ token: 'test-token' }); | |
| void sleep('1d'); | |
| const results = []; | |
| for await (const payload of hook) { | |
| const processed = await processPayload(payload); | |
| results.push(processed); | |
| if (payload.done) { | |
| break; | |
| } | |
| } | |
| return results; | |
| }${getWorkflowTransformCode('workflow')}`, | |
| workflowRun, | |
| events, | |
| noEncryptionKey | |
| ); | |
| expect(result).not.toBeInstanceOf(Error); | |
| const hydrated = await hydrateWorkflowReturnValue( | |
| result, | |
| workflowRunId, | |
| noEncryptionKey, | |
| ops | |
| ); | |
| expect(hydrated).toEqual([ | |
| { processed: true, type: 'subscribe', id: 1 }, | |
| { processed: true, type: 'done' }, | |
| ]); | |
| } finally { | |
| consoleWarnSpy.mockRestore(); | |
| } |
| let callCount = 0; | ||
| const spy = vi | ||
| .spyOn(serialization, 'hydrateStepReturnValue') | ||
| .mockImplementation(async (...args) => { | ||
| callCount++; | ||
| const delay = [5, 5, 150, 5][callCount - 1] ?? 5; |
There was a problem hiding this comment.
The mocked hydration delay is driven by callCount, which makes the test brittle: any future change that adds/removes a hydrateStepReturnValue call (or changes call order) will silently alter which hydration is delayed and may stop exercising the intended race. Prefer keying the delay off something stable in the hydration input (e.g., the specific payload/step result marker) so the test keeps simulating the same timing scenario as implementation details evolve.
| let callCount = 0; | |
| const spy = vi | |
| .spyOn(serialization, 'hydrateStepReturnValue') | |
| .mockImplementation(async (...args) => { | |
| callCount++; | |
| const delay = [5, 5, 150, 5][callCount - 1] ?? 5; | |
| const shouldDelayHydration = (value: unknown): boolean => { | |
| if (!types.isProxy(value) && (typeof value !== 'object' || value === null)) { | |
| return false; | |
| } | |
| return ( | |
| 'type' in value && | |
| (value as { type?: unknown }).type === 'done' | |
| ); | |
| }; | |
| const spy = vi | |
| .spyOn(serialization, 'hydrateStepReturnValue') | |
| .mockImplementation(async (...args) => { | |
| const delay = shouldDelayHydration(args[0]) ? 150 : 5; |
| @@ -0,0 +1,5 @@ | |||
| --- | |||
| '@workflow/core': patch | |||
There was a problem hiding this comment.
Changeset frontmatter in this repo uses double quotes for package names (e.g. "@workflow/core": patch). This file uses single quotes, which may break consistency and tooling expectations. Please switch to double quotes here.
| '@workflow/core': patch | |
| "@workflow/core": patch |
| this.pendingUnconsumedTimeout = setTimeout(() => { | ||
| this.pendingUnconsumedTimeout = null; | ||
| if (this.unconsumedCheckVersion === checkVersion) { | ||
| this.pendingUnconsumedCheck = null; | ||
| this.onUnconsumedEvent(currentEvent); | ||
| } | ||
| }, 100); |
There was a problem hiding this comment.
The unconsumed-event check can be canceled via subscribe() (version invalidation), but this code still schedules the setTimeout even if checkVersion is already stale at the time this .then() runs. That leaves a stray timer that can't be cleared and can add unnecessary work under churn. Consider checking this.unconsumedCheckVersion === checkVersion before assigning pendingUnconsumedTimeout (and/or before the second queue wait) so canceled checks don’t schedule timers at all.
NOTE Do not merge, this is for v0 testing. Instead we should merge #1778 and then use the backport label
Summary
EventsConsumerfix that re-checks the latest promise queue before reporting an unconsumed event during replayrunWorkflowregression forfor awaithook loops where the next payload hydration is appended after a prior step resolves@workflow/coreTesting