diff --git a/.changeset/fix-waitforevent-stale-waiter.md b/.changeset/fix-waitforevent-stale-waiter.md new file mode 100644 index 0000000000..46ed57e9db --- /dev/null +++ b/.changeset/fix-waitforevent-stale-waiter.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/workflows-shared": patch +--- + +Fix `waitForEvent` delivering events to stale waiters after timeout. + +When a `step.waitForEvent()` call timed out, its resolver was not removed from the workflow's internal waiters map. This meant the next `step.waitForEvent()` for the same event type would have its incoming event consumed by the dead resolver instead of the live one, causing the workflow to hang indefinitely. diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index f76f82eeaf..a4248e9df9 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -861,7 +861,7 @@ export class Context extends RpcTarget { } } const callbacks = this.#engine.waiters.get(options.type) ?? []; - callbacks.push(resolve); + callbacks.push([cacheKey, resolve]); this.#engine.waiters.set(options.type, callbacks); }); @@ -886,6 +886,14 @@ export class Context extends RpcTarget { : timeoutPromise(ms(options.timeout), true), pausePromise, ]).catch(async (error) => { + const callbacks = this.#engine.waiters.get(options.type); + if (callbacks) { + const idx = callbacks.findIndex(([key]) => key === cacheKey); + if (idx !== -1) { + callbacks.splice(idx, 1); + } + } + this.#engine.writeLog( InstanceEvent.WAIT_TIMED_OUT, cacheKey, diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 40eaa86efc..cb35c0d7b8 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -101,8 +101,12 @@ export class Engine extends DurableObject { engineAbortController: AbortController = new AbortController(); pauseController: AbortController = new AbortController(); - waiters: Map) => void>> = - new Map(); + waiters: Map< + string, + Array< + [cacheKey: string, resolve: (event: Event | PromiseLike) => void] + > + > = new Map(); eventMap: Map> = new Map(); constructor(state: DurableObjectState, env: Env) { @@ -506,9 +510,10 @@ export class Engine extends DurableObject { // Attempt to get the callback and run it const callbacks = this.waiters.get(event.type); if (callbacks) { - const callback = callbacks[0]; - if (callback) { - callback(event); + const entry = callbacks[0]; + if (entry) { + const [, resolve] = entry; + resolve(event); // Remove it from the list of callbacks callbacks.shift(); this.waiters.set(event.type, callbacks); diff --git a/packages/workflows-shared/tests/engine.test.ts b/packages/workflows-shared/tests/engine.test.ts index caec69ecc8..5505dbf80f 100644 --- a/packages/workflows-shared/tests/engine.test.ts +++ b/packages/workflows-shared/tests/engine.test.ts @@ -182,6 +182,109 @@ describe("Engine", () => { }, 500); }); + it("waitForEvent should not deliver events to timed-out events with the same type", async ({ + expect, + }) => { + const instanceId = "WAIT-FOR-EVENT-STALE-WAITER"; + const engineStub = await runWorkflow(instanceId, async (_, step) => { + const results: Array<{ + iteration: number; + received: boolean; + }> = []; + + for (let i = 1; i <= 3; i++) { + try { + await step.waitForEvent(`my-event-waiter-${i}`, { + type: "my-event", + timeout: 500, + }); + results.push({ iteration: i, received: true }); + } catch { + results.push({ iteration: i, received: false }); + } + } + + return { results }; + }); + + // 1st waitForEvent iteration - should receive event + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return ( + logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START) + .length >= 1 + ); + }, + { timeout: 500 } + ); + + await engineStub.receiveEvent({ + type: "my-event", + timestamp: new Date(), + payload: { iteration: 1 }, + }); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return ( + logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START) + .length >= 2 + ); + }, + { timeout: 500 } + ); + + // 2nd waitForEvent iteration - should timeout (500ms) + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WAIT_TIMED_OUT + ); + }, + { timeout: 1000 } + ); + + // 3rd waitForEvent iteration - should receive event + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return ( + logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START) + .length >= 3 + ); + }, + { timeout: 500 } + ); + + await engineStub.receiveEvent({ + type: "my-event", + timestamp: new Date(), + payload: { iteration: 3 }, + }); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 1000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + // Iterations 1 and 3 received events; iteration 2 timed out + expect( + logs.logs.filter((val) => val.event === InstanceEvent.WAIT_COMPLETE) + ).toHaveLength(2); + expect( + logs.logs.filter((val) => val.event === InstanceEvent.WAIT_TIMED_OUT) + ).toHaveLength(1); + }); + it("should restore state from storage when accountId is undefined", async ({ expect, }) => {