From a02fbafd32a8e321e5d12eccf81c247a8e12fb4b Mon Sep 17 00:00:00 2001 From: lily Date: Wed, 11 Mar 2026 14:14:13 -0700 Subject: [PATCH] [vitest-pool-workers] add `disableRetryDelays()` to skip retry backoff delays in workflow tests --- .changeset/disable-retry-delays.md | 7 ++ .../workflows/test/unit.test.ts | 34 +++++++++ .../types/cloudflare-test.d.ts | 33 ++++++++ packages/workflows-shared/src/context.ts | 25 +++++- packages/workflows-shared/src/modifier.ts | 19 +++++ .../workflows-shared/tests/context.test.ts | 76 ++++++++++++++++++- 6 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 .changeset/disable-retry-delays.md diff --git a/.changeset/disable-retry-delays.md b/.changeset/disable-retry-delays.md new file mode 100644 index 0000000000..6f38b183ec --- /dev/null +++ b/.changeset/disable-retry-delays.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/vitest-pool-workers": minor +--- + +Add `disableRetryDelays()` to `WorkflowInstanceModifier` to skip retry backoff delays in tests + +When testing Workflows with retry configurations, the backoff delays between retry attempts of a failing `step.do()` caused real wall-clock waiting (e.g., 35 seconds for 3 retries with 5-second exponential backoff), even when step results were fully mocked. The new `disableRetryDelays()` method eliminates these delays while preserving retry behavior — all attempts still execute, just without waiting between them. diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts b/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts index 54c92df723..056c596336 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts @@ -148,6 +148,40 @@ it("should be reviewed, accepted and complete", async ({ expect }) => { expect(output).toEqual({ decision: "approve", status: "moderated" }); }); +it("should disable retry delays when mocking step errors", async ({ + expect, +}) => { + const mockResult = { violationScore: 0 }; + + await using instance = await introspectWorkflowInstance( + env.MODERATOR, + INSTANCE_ID + ); + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.disableRetryDelays(); + await m.mockStepError( + { name: STEP_NAME }, + new Error("Transient failure"), + 2 + ); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + + await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow(); + + const output = await instance.getOutput(); + expect(output).toEqual({ status: "auto_approved" }); +}); + it("should force human review to timeout and error", async ({ expect }) => { const mockResult = { violationScore: 50 }; diff --git a/packages/vitest-pool-workers/types/cloudflare-test.d.ts b/packages/vitest-pool-workers/types/cloudflare-test.d.ts index 8e63b1e52f..1ce0717183 100644 --- a/packages/vitest-pool-workers/types/cloudflare-test.d.ts +++ b/packages/vitest-pool-workers/types/cloudflare-test.d.ts @@ -327,6 +327,39 @@ declare module "cloudflare:test" { */ disableSleeps(steps?: { name: string; index?: number }[]): Promise; + /** + * Disables retry backoff delays, causing retry attempts of a failing + * `step.do()` to execute immediately without waiting. + * + * By default, when a step fails and has retries configured, the engine + * waits according to the retry config (e.g., exponential backoff). + * This method eliminates those delays while preserving retry behavior + * (all attempts still execute, just without waiting between them). + * + * @example Disable all retry delays: + * ```ts + * await instance.modify(m => { + * m.disableRetryDelays(); + * }); + * ``` + * + * @example Disable retry delays for specific steps: + * ```ts + * await instance.modify(m => { + * m.disableRetryDelays([{ name: "fetch-data" }, { name: "call-api" }]); + * }); + * ``` + * + * @param steps - Optional array of specific steps to disable retry delays for. + * If omitted, **all retry delays** in the Workflow will be disabled. + * A step is an object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + */ + disableRetryDelays( + steps?: { name: string; index?: number }[] + ): Promise; + /** * Mocks the result of a `step.do()`, causing it to return a specified * value instantly without executing the step's actual implementation. diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index a4248e9df9..f263b08be3 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -175,6 +175,7 @@ export class Context extends RpcTarget { const errorKey = `${cacheKey}-error`; const stepNameWithCounter = `${name}-${count}`; const stepStateKey = `${cacheKey}-metadata`; + const retryDelayDisableKey = `${MODIFIER_KEYS.DISABLE_RETRY_DELAY}${valueKey}`; const maybeMap = await this.#state.storage.get([ valueKey, @@ -283,8 +284,18 @@ export class Context extends RpcTarget { ); // complete sleep if it didn't finish for some reason if (retryEntryPQ !== undefined) { + const disableAllRetryDelays = await this.#state.storage.get( + MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS + ); + const disableThisRetryDelay = + await this.#state.storage.get(retryDelayDisableKey); + const disableRetryDelay = + disableAllRetryDelays || disableThisRetryDelay; + await this.#engine.timeoutHandler.release(this.#engine); - await scheduler.wait(retryEntryPQ.targetTimestamp - Date.now()); + await scheduler.wait( + disableRetryDelay ? 0 : retryEntryPQ.targetTimestamp - Date.now() + ); await this.#engine.timeoutHandler.acquire(this.#engine); // @ts-expect-error priorityQueue is initiated in init this.#engine.priorityQueue.remove({ @@ -507,12 +518,20 @@ export class Context extends RpcTarget { if (stepState.attemptedCount <= config.retries.limit) { // TODO (WOR-71): Think through if every Error should transition const durationMs = calcRetryDuration(config, stepState); + const disableAllRetryDelays = await this.#state.storage.get( + MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS + ); + const disableThisRetryDelay = + await this.#state.storage.get(retryDelayDisableKey); + const disableRetryDelay = + disableAllRetryDelays || disableThisRetryDelay; + const effectiveDuration = disableRetryDelay ? 0 : durationMs; const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; // @ts-expect-error priorityQueue is initiated in init await this.#engine.priorityQueue.add({ hash: priorityQueueHash, - targetTimestamp: Date.now() + durationMs, + targetTimestamp: Date.now() + effectiveDuration, type: "retry", }); await this.#engine.timeoutHandler.release(this.#engine); @@ -522,7 +541,7 @@ export class Context extends RpcTarget { const retryPauseSignal = this.#engine.pauseController.signal; let pausedDuringRetry = false; await Promise.race([ - scheduler.wait(durationMs), + scheduler.wait(effectiveDuration), new Promise((resolve) => { if (retryPauseSignal.aborted) { resolve(); diff --git a/packages/workflows-shared/src/modifier.ts b/packages/workflows-shared/src/modifier.ts index 04f32ccef2..24c48b0850 100644 --- a/packages/workflows-shared/src/modifier.ts +++ b/packages/workflows-shared/src/modifier.ts @@ -23,6 +23,8 @@ export const MODIFIER_KEYS = { FAILURE_INDEX: "failure-index-", DISABLE_SLEEP: "disable-sleep-", DISABLE_ALL_SLEEPS: "disableAllSleeps", + DISABLE_RETRY_DELAY: "disable-retry-delay-", + DISABLE_ALL_RETRY_DELAYS: "disableAllRetryDelays", } as const; export function isModifierKey(key: string): boolean { @@ -92,6 +94,23 @@ export class WorkflowInstanceModifier extends RpcTarget { } } + async disableRetryDelays(steps?: StepSelector[]): Promise { + if (!steps) { + await this.#state.storage.put( + MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS, + true + ); + } else { + for (const step of steps) { + const valueKey = await this.#getStepCacheKey(step); + await this.#state.storage.put( + `${MODIFIER_KEYS.DISABLE_RETRY_DELAY}${valueKey}`, + true + ); + } + } + } + // step.do() flow: It first checks if a result or error is already in the cache and, if so, returns it immediately. // If nothing is in the cache, it checks for remaining attempts and runs the user's code against the defined timeout. // Since `step.do()` performs this initial cache check, directly changing the `valueKey` would cause it to diff --git a/packages/workflows-shared/tests/context.test.ts b/packages/workflows-shared/tests/context.test.ts index 43c7bd31df..03a5aa569f 100644 --- a/packages/workflows-shared/tests/context.test.ts +++ b/packages/workflows-shared/tests/context.test.ts @@ -1,7 +1,9 @@ +import { env, runInDurableObject } from "cloudflare:test"; import { afterEach, describe, it, vi } from "vitest"; import workerdUnsafe from "workerd:unsafe"; import { InstanceEvent } from "../src"; -import { runWorkflow } from "./utils"; +import { MODIFIER_KEYS } from "../src/modifier"; +import { runWorkflow, runWorkflowAndAwait } from "./utils"; import type { EngineLogs } from "../src/engine"; afterEach(async () => { @@ -74,4 +76,76 @@ describe("Context", () => { // Should have received attempts 1, 2, and 3 expect(receivedAttempts).toEqual([1, 2, 3]); }); + + it("should wait for retry delays by default", async ({ expect }) => { + const start = Date.now(); + const engineStub = await runWorkflowAndAwait( + "MOCK-INSTANCE-RETRY-DELAYS", + async (_event, step) => { + const result = await step.do( + "retrying step with delay", + { + retries: { + limit: 1, + delay: "1 second", + backoff: "constant", + }, + }, + async () => { + throw new Error("Always fails"); + } + ); + return result; + } + ); + const elapsed = Date.now() - start; + + const logs = (await engineStub.readLogs()) as EngineLogs; + expect( + logs.logs.filter((val) => val.event === InstanceEvent.ATTEMPT_START) + ).toHaveLength(2); + // Should have waited at least ~1 second for the retry delay + expect(elapsed).toBeGreaterThanOrEqual(900); + }); + + it("should skip retry delays when disableRetryDelays is set", async ({ + expect, + }) => { + const engineId = env.ENGINE.idFromName("MOCK-INSTANCE-DISABLE-RETRY"); + const engineStub = env.ENGINE.get(engineId); + + // Set the disableRetryDelays flag before running the workflow + await runInDurableObject(engineStub, async (_engine, state) => { + await state.storage.put(MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS, true); + }); + + const start = Date.now(); + const stub = await runWorkflowAndAwait( + "MOCK-INSTANCE-DISABLE-RETRY", + async (_event, step) => { + const result = await step.do( + "retrying step with delay", + { + retries: { + limit: 2, + delay: "10 seconds", + backoff: "constant", + }, + }, + async () => { + throw new Error("Always fails"); + } + ); + return result; + } + ); + const elapsed = Date.now() - start; + + const logs = (await stub.readLogs()) as EngineLogs; + expect( + logs.logs.filter((val) => val.event === InstanceEvent.ATTEMPT_START) + ).toHaveLength(3); + // Without disableRetryDelays, this would take 20+ seconds (10s + 10s) + expect(elapsed).toBeLessThan(5000); + }); });