Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fix-waitforevent-stale-waiter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/workflows-shared": patch
---

Fix `waitForEvent` delivering events to stale waiters after timeout.

When a `step.waitForEvent()` call timed out, its resolver was not removed from the workflow's internal waiters map. This meant the next `step.waitForEvent()` for the same event type would have its incoming event consumed by the dead resolver instead of the live one, causing the workflow to hang indefinitely.
10 changes: 9 additions & 1 deletion packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ export class Context extends RpcTarget {
}
}
const callbacks = this.#engine.waiters.get(options.type) ?? [];
callbacks.push(resolve);
callbacks.push([cacheKey, resolve]);

this.#engine.waiters.set(options.type, callbacks);
});
Expand All @@ -886,6 +886,14 @@ export class Context extends RpcTarget {
: timeoutPromise(ms(options.timeout), true),
pausePromise,
]).catch(async (error) => {
const callbacks = this.#engine.waiters.get(options.type);
if (callbacks) {
const idx = callbacks.findIndex(([key]) => key === cacheKey);
if (idx !== -1) {
callbacks.splice(idx, 1);
}
}

this.#engine.writeLog(
InstanceEvent.WAIT_TIMED_OUT,
cacheKey,
Expand Down
15 changes: 10 additions & 5 deletions packages/workflows-shared/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ export class Engine extends DurableObject<Env> {
engineAbortController: AbortController = new AbortController();
pauseController: AbortController = new AbortController();

waiters: Map<string, Array<(event: Event | PromiseLike<Event>) => void>> =
new Map();
waiters: Map<
string,
Array<
[cacheKey: string, resolve: (event: Event | PromiseLike<Event>) => void]
>
> = new Map();
eventMap: Map<string, Array<Event>> = new Map();

constructor(state: DurableObjectState, env: Env) {
Expand Down Expand Up @@ -506,9 +510,10 @@ export class Engine extends DurableObject<Env> {
// Attempt to get the callback and run it
const callbacks = this.waiters.get(event.type);
if (callbacks) {
const callback = callbacks[0];
if (callback) {
callback(event);
const entry = callbacks[0];
if (entry) {
const [, resolve] = entry;
resolve(event);
// Remove it from the list of callbacks
callbacks.shift();
this.waiters.set(event.type, callbacks);
Expand Down
103 changes: 103 additions & 0 deletions packages/workflows-shared/tests/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,109 @@ describe("Engine", () => {
}, 500);
});

it("waitForEvent should not deliver events to timed-out events with the same type", async ({
expect,
}) => {
const instanceId = "WAIT-FOR-EVENT-STALE-WAITER";
const engineStub = await runWorkflow(instanceId, async (_, step) => {
const results: Array<{
iteration: number;
received: boolean;
}> = [];

for (let i = 1; i <= 3; i++) {
try {
await step.waitForEvent(`my-event-waiter-${i}`, {
type: "my-event",
timeout: 500,
});
results.push({ iteration: i, received: true });
} catch {
results.push({ iteration: i, received: false });
}
}

return { results };
});

// 1st waitForEvent iteration - should receive event
await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return (
logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START)
.length >= 1
);
},
{ timeout: 500 }
);

await engineStub.receiveEvent({
type: "my-event",
timestamp: new Date(),
payload: { iteration: 1 },
});

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return (
logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START)
.length >= 2
);
},
{ timeout: 500 }
);

// 2nd waitForEvent iteration - should timeout (500ms)
await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WAIT_TIMED_OUT
);
},
{ timeout: 1000 }
);

// 3rd waitForEvent iteration - should receive event
await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return (
logs.logs.filter((val) => val.event === InstanceEvent.WAIT_START)
.length >= 3
);
},
{ timeout: 500 }
);

await engineStub.receiveEvent({
type: "my-event",
timestamp: new Date(),
payload: { iteration: 3 },
});

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
);
},
{ timeout: 1000 }
);

const logs = (await engineStub.readLogs()) as EngineLogs;
// Iterations 1 and 3 received events; iteration 2 timed out
expect(
logs.logs.filter((val) => val.event === InstanceEvent.WAIT_COMPLETE)
).toHaveLength(2);
expect(
logs.logs.filter((val) => val.event === InstanceEvent.WAIT_TIMED_OUT)
).toHaveLength(1);
});

it("should restore state from storage when accountId is undefined", async ({
expect,
}) => {
Expand Down
Loading