Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/disable-retry-delays.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/vitest-pool-workers": minor
---

Add `disableRetryDelays()` to `WorkflowInstanceModifier` to skip retry backoff delays in tests

When testing Workflows with retry configurations, the backoff delays between retry attempts of a failing `step.do()` caused real wall-clock waiting (e.g., 35 seconds for 3 retries with 5-second exponential backoff), even when step results were fully mocked. The new `disableRetryDelays()` method eliminates these delays while preserving retry behavior — all attempts still execute, just without waiting between them.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,40 @@ it("should be reviewed, accepted and complete", async ({ expect }) => {
expect(output).toEqual({ decision: "approve", status: "moderated" });
});

it("should disable retry delays when mocking step errors", async ({
expect,
}) => {
const mockResult = { violationScore: 0 };

await using instance = await introspectWorkflowInstance(
env.MODERATOR,
INSTANCE_ID
);
await instance.modify(async (m) => {
await m.disableSleeps();
await m.disableRetryDelays();
await m.mockStepError(
{ name: STEP_NAME },
new Error("Transient failure"),
2
);
await m.mockStepResult({ name: STEP_NAME }, mockResult);
});

await env.MODERATOR.create({
id: INSTANCE_ID,
});

expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual(
mockResult
);

await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow();

const output = await instance.getOutput();
expect(output).toEqual({ status: "auto_approved" });
});

it("should force human review to timeout and error", async ({ expect }) => {
const mockResult = { violationScore: 50 };

Expand Down
33 changes: 33 additions & 0 deletions packages/vitest-pool-workers/types/cloudflare-test.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,39 @@ declare module "cloudflare:test" {
*/
disableSleeps(steps?: { name: string; index?: number }[]): Promise<void>;

/**
* Disables retry backoff delays, causing retry attempts of a failing
* `step.do()` to execute immediately without waiting.
*
* By default, when a step fails and has retries configured, the engine
* waits according to the retry config (e.g., exponential backoff).
* This method eliminates those delays while preserving retry behavior
* (all attempts still execute, just without waiting between them).
*
* @example Disable all retry delays:
* ```ts
* await instance.modify(m => {
* m.disableRetryDelays();
Comment thread
pombosilva marked this conversation as resolved.
* });
* ```
*
* @example Disable retry delays for specific steps:
* ```ts
* await instance.modify(m => {
* m.disableRetryDelays([{ name: "fetch-data" }, { name: "call-api" }]);
Comment thread
pombosilva marked this conversation as resolved.
* });
* ```
*
* @param steps - Optional array of specific steps to disable retry delays for.
* If omitted, **all retry delays** in the Workflow will be disabled.
* A step is an object specifying the step `name` and optional `index` (1-based).
* If multiple steps share the same name, `index` targets a specific one.
* Defaults to the first step found (`index: 1`).
*/
disableRetryDelays(
steps?: { name: string; index?: number }[]
): Promise<void>;

/**
* Mocks the result of a `step.do()`, causing it to return a specified
* value instantly without executing the step's actual implementation.
Expand Down
25 changes: 22 additions & 3 deletions packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ export class Context extends RpcTarget {
const errorKey = `${cacheKey}-error`;
const stepNameWithCounter = `${name}-${count}`;
const stepStateKey = `${cacheKey}-metadata`;
const retryDelayDisableKey = `${MODIFIER_KEYS.DISABLE_RETRY_DELAY}${valueKey}`;

const maybeMap = await this.#state.storage.get([
valueKey,
Expand Down Expand Up @@ -283,8 +284,18 @@ export class Context extends RpcTarget {
);
// complete sleep if it didn't finish for some reason
if (retryEntryPQ !== undefined) {
const disableAllRetryDelays = await this.#state.storage.get(
MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS
);
const disableThisRetryDelay =
await this.#state.storage.get(retryDelayDisableKey);
const disableRetryDelay =
disableAllRetryDelays || disableThisRetryDelay;

await this.#engine.timeoutHandler.release(this.#engine);
await scheduler.wait(retryEntryPQ.targetTimestamp - Date.now());
await scheduler.wait(
disableRetryDelay ? 0 : retryEntryPQ.targetTimestamp - Date.now()
);
await this.#engine.timeoutHandler.acquire(this.#engine);
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
Expand Down Expand Up @@ -507,12 +518,20 @@ export class Context extends RpcTarget {
if (stepState.attemptedCount <= config.retries.limit) {
// TODO (WOR-71): Think through if every Error should transition
const durationMs = calcRetryDuration(config, stepState);
const disableAllRetryDelays = await this.#state.storage.get(
MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS
);
const disableThisRetryDelay =
await this.#state.storage.get(retryDelayDisableKey);
const disableRetryDelay =
disableAllRetryDelays || disableThisRetryDelay;
const effectiveDuration = disableRetryDelay ? 0 : durationMs;

const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
// @ts-expect-error priorityQueue is initiated in init
await this.#engine.priorityQueue.add({
hash: priorityQueueHash,
targetTimestamp: Date.now() + durationMs,
targetTimestamp: Date.now() + effectiveDuration,
type: "retry",
});
await this.#engine.timeoutHandler.release(this.#engine);
Expand All @@ -522,7 +541,7 @@ export class Context extends RpcTarget {
const retryPauseSignal = this.#engine.pauseController.signal;
let pausedDuringRetry = false;
await Promise.race([
scheduler.wait(durationMs),
scheduler.wait(effectiveDuration),
new Promise<void>((resolve) => {
if (retryPauseSignal.aborted) {
resolve();
Expand Down
19 changes: 19 additions & 0 deletions packages/workflows-shared/src/modifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export const MODIFIER_KEYS = {
FAILURE_INDEX: "failure-index-",
DISABLE_SLEEP: "disable-sleep-",
DISABLE_ALL_SLEEPS: "disableAllSleeps",
DISABLE_RETRY_DELAY: "disable-retry-delay-",
DISABLE_ALL_RETRY_DELAYS: "disableAllRetryDelays",
} as const;

export function isModifierKey(key: string): boolean {
Expand Down Expand Up @@ -92,6 +94,23 @@ export class WorkflowInstanceModifier extends RpcTarget {
}
}

async disableRetryDelays(steps?: StepSelector[]): Promise<void> {
if (!steps) {
await this.#state.storage.put(
MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS,
true
);
} else {
for (const step of steps) {
const valueKey = await this.#getStepCacheKey(step);
await this.#state.storage.put(
`${MODIFIER_KEYS.DISABLE_RETRY_DELAY}${valueKey}`,
true
);
}
}
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

// step.do() flow: It first checks if a result or error is already in the cache and, if so, returns it immediately.
// If nothing is in the cache, it checks for remaining attempts and runs the user's code against the defined timeout.
// Since `step.do()` performs this initial cache check, directly changing the `valueKey` would cause it to
Expand Down
76 changes: 75 additions & 1 deletion packages/workflows-shared/tests/context.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { env, runInDurableObject } from "cloudflare:test";
import { afterEach, describe, it, vi } from "vitest";
import workerdUnsafe from "workerd:unsafe";
import { InstanceEvent } from "../src";
import { runWorkflow } from "./utils";
import { MODIFIER_KEYS } from "../src/modifier";
import { runWorkflow, runWorkflowAndAwait } from "./utils";
import type { EngineLogs } from "../src/engine";

afterEach(async () => {
Expand Down Expand Up @@ -74,4 +76,76 @@ describe("Context", () => {
// Should have received attempts 1, 2, and 3
expect(receivedAttempts).toEqual([1, 2, 3]);
});

it("should wait for retry delays by default", async ({ expect }) => {
const start = Date.now();
const engineStub = await runWorkflowAndAwait(
"MOCK-INSTANCE-RETRY-DELAYS",
async (_event, step) => {
const result = await step.do(
"retrying step with delay",
{
retries: {
limit: 1,
delay: "1 second",
backoff: "constant",
},
},
async () => {
throw new Error("Always fails");
}
);
return result;
}
);
const elapsed = Date.now() - start;

const logs = (await engineStub.readLogs()) as EngineLogs;
expect(
logs.logs.filter((val) => val.event === InstanceEvent.ATTEMPT_START)
).toHaveLength(2);
// Should have waited at least ~1 second for the retry delay
expect(elapsed).toBeGreaterThanOrEqual(900);
});

it("should skip retry delays when disableRetryDelays is set", async ({
expect,
}) => {
const engineId = env.ENGINE.idFromName("MOCK-INSTANCE-DISABLE-RETRY");
const engineStub = env.ENGINE.get(engineId);

// Set the disableRetryDelays flag before running the workflow
await runInDurableObject(engineStub, async (_engine, state) => {
await state.storage.put(MODIFIER_KEYS.DISABLE_ALL_RETRY_DELAYS, true);
});

const start = Date.now();
const stub = await runWorkflowAndAwait(
"MOCK-INSTANCE-DISABLE-RETRY",
async (_event, step) => {
const result = await step.do(
"retrying step with delay",
{
retries: {
limit: 2,
delay: "10 seconds",
backoff: "constant",
},
},
async () => {
throw new Error("Always fails");
}
);
return result;
}
);
const elapsed = Date.now() - start;

const logs = (await stub.readLogs()) as EngineLogs;
expect(
logs.logs.filter((val) => val.event === InstanceEvent.ATTEMPT_START)
).toHaveLength(3);
// Without disableRetryDelays, this would take 20+ seconds (10s + 10s)
expect(elapsed).toBeLessThan(5000);
});
});
Loading