Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/better-peas-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world": patch
"@workflow/core": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
---

Combine initial run fetch, event fetch, and run_started event creation
56 changes: 34 additions & 22 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,42 +191,50 @@ export function workflowEntrypoint(
});

let workflowStartedAt = -1;
let workflowRun = await world.runs.get(runId);
let workflowRun: WorkflowRun | undefined;
// Pre-loaded events from the run_started response.
// When present, we skip the events.list call to reduce TTFB.
let preloadedEvents: Event[] | undefined;

// --- Infrastructure: prepare the run state ---
// Always call run_started directly — this both transitions
// the run to 'running' AND returns the run entity, saving
// a separate runs.get round-trip.
// Contract: events.create('run_started') must be idempotent
// for runs already in 'running' status (return the run
// without error), not just for pending → running transitions.
// Network/server errors propagate to the queue handler for retry.
// WorkflowRuntimeError (data integrity issues) are fatal and
// produce run_failed since retrying won't fix them.
try {
if (workflowRun.status === 'pending') {
// Transition run to 'running' via event (event-sourced architecture)
const result = await world.events.create(
runId,
{
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
},
{ requestId }
const result = await world.events.create(
runId,
{
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
},
{ requestId }
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking observation: Now that runs.get is removed and the runtime always calls events.create(run_started), the behavior changes for runs that are already running. Previously, runs.get would succeed and the if (status === 'pending') guard would skip the run_started creation. Now, every invocation attempts run_started regardless of status.

This works because the world implementations have an early-return for already-running runs — but it's a semantic change worth noting in the PR description. The contract is now: events.create('run_started') must be idempotent for running status (return the run without error), not just for pending → running transitions.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout — added a comment documenting this contract: events.create('run_started') must be idempotent for runs already in running status, not just for pending → running transitions.

if (!result.run) {
throw new WorkflowRuntimeError(
`Event creation for 'run_started' did not return the run entity for run "${runId}"`
);
// Use the run entity from the event response (no extra get call needed)
if (!result.run) {
throw new WorkflowRuntimeError(
`Event creation for 'run_started' did not return the run entity for run "${runId}"`
);
}
workflowRun = result.run;
}
workflowRun = result.run;

// If the world returned events, use them to skip
// the initial events.list call and reduce TTFB.
if (result.events && result.events.length > 0) {
preloadedEvents = result.events;
}

// At this point, the workflow is "running" and `startedAt` should
// definitely be set.
if (!workflowRun.startedAt) {
throw new WorkflowRuntimeError(
`Workflow run "${runId}" has no "startedAt" timestamp`
);
}
} catch (err) {
// Run was concurrently completed/failed/cancelled
// between the GET and the run_started event creation
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
runtimeLogger.info(
'Run already finished during setup, skipping',
Expand Down Expand Up @@ -294,8 +302,12 @@ export function workflowEntrypoint(
return;
}

// Load all events into memory before running
const events = await getAllWorkflowRunEvents(workflowRun.runId);
// Load all events into memory before running.
// If we got pre-loaded events from the run_started response,
// skip the events.list round-trip to reduce TTFB.
const events =
preloadedEvents ??
(await getAllWorkflowRunEvents(workflowRun.runId));

// Check for any elapsed waits and create wait_completed events
const now = Date.now();
Expand Down
39 changes: 38 additions & 1 deletion packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,15 @@ export function createEventsStorage(
};
}

// Run state transitions are not allowed on terminal runs
// For run_started on terminal runs, use RunExpiredError so the
// runtime knows to exit without retrying.
if (data.eventType === 'run_started') {
throw new RunExpiredError(
`Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"`
);
}

// Other run state transitions are not allowed on terminal runs
if (
runTerminalEvents.includes(data.eventType) ||
data.eventType === 'run_cancelled'
Expand Down Expand Up @@ -280,6 +288,10 @@ export function createEventsStorage(
createdAt: now,
specVersion: effectiveSpecVersion,
};
// Strip eventData from run_started — it belongs on run_created only.
if (data.eventType === 'run_started' && 'eventData' in event) {
delete (event as any).eventData;
}

// Track entity created/updated for EventResult
let run: WorkflowRun | undefined;
Expand Down Expand Up @@ -316,6 +328,15 @@ export function createEventsStorage(
} else if (data.eventType === 'run_started') {
// Reuse currentRun from validation (already read above)
if (currentRun) {
// If already running, return the run without inserting a
// duplicate event. This makes run_started idempotent for
// concurrent invocations. We omit preloaded events here
// because this is a rare race-condition path — the runtime
// falls back to getAllWorkflowRunEvents().
if (currentRun.status === 'running') {
return { run: currentRun };
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking (same concern as postgres): This early return for already-running also returns { run: currentRun } with no event and no events. The runtime will fall back to getAllWorkflowRunEvents() which is fine, but worth a comment explaining why skipping the preloaded events is acceptable here (presumably because this path is a rare race condition).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment — same rationale as the postgres path (rare race condition, runtime falls back to full event fetch).

}

run = {
runId: currentRun.runId,
deploymentId: currentRun.deploymentId,
Expand Down Expand Up @@ -832,13 +853,29 @@ export function createEventsStorage(
const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION;
const filteredEvent = stripEventDataRefs(event, resolveData);

// For run_started: include all events so the runtime can skip
// the initial events.list call and reduce TTFB.
let events: Event[] | undefined;
if (data.eventType === 'run_started' && run) {
const allEvents = await paginatedFileSystemQuery({
directory: path.join(basedir, 'events'),
schema: EventSchema,
filePrefix: `${effectiveRunId}-`,
sortOrder: 'asc',
getCreatedAt: getObjectCreatedAt('evnt'),
getId: (e) => e.eventId,
});
events = allEvents.data;
}

// Return EventResult with event and any created/updated entity
return {
event: filteredEvent,
run,
step,
hook,
wait,
events,
};
},

Expand Down
68 changes: 65 additions & 3 deletions packages/world-postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,15 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
};
}

// Run state transitions are not allowed on terminal runs
// For run_started on terminal runs, use RunExpiredError so the
// runtime knows to exit without retrying.
if (data.eventType === 'run_started') {
throw new RunExpiredError(
`Workflow run "${effectiveRunId}" is already in terminal state "${currentRun.status}"`
);
}

// Other run state transitions are not allowed on terminal runs
if (
runTerminalEvents.includes(data.eventType) ||
data.eventType === 'run_cancelled'
Expand Down Expand Up @@ -563,6 +571,23 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {

// Handle run_started event: update run status
if (data.eventType === 'run_started') {
// If the run is already running, return it without inserting a
// duplicate run_started event. This makes run_started idempotent
// for concurrent invocations: replay is deterministic, so letting
// multiple callers proceed with the same run is safe. We skip
// preloaded events here because this is a rare race-condition path
// — the runtime falls back to getAllWorkflowRunEvents().
if (currentRun?.status === 'running') {
const [fullRun] = await drizzle
.select()
.from(Schema.runs)
.where(eq(Schema.runs.runId, effectiveRunId))
.limit(1);
if (fullRun) {
return { run: deserializeRunError(compact(fullRun)) };
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: When the run is already running, this returns { run } with no events field. The runtime will then fall back to getAllWorkflowRunEvents(), which is fine functionally — but it means re-invocations of an already-running workflow get no TTFB benefit from the preloaded events optimization.

More importantly, this early return happens before the event insertion, so it short-circuits the rest of the create method. That means it also skips the requestId idempotency check. If two concurrent invocations race, the first one transitions pending→running and continues, but the second hits this branch and also continues with a valid { run } result. Is that the intended behavior? Normally a duplicate run_started would be guarded by the DB unique constraint on eventId, but here we skip insertion entirely.

If this is intentional (i.e., it's safe for multiple invocations to proceed since replay is deterministic), a comment explaining that would help. If not, this should at minimum return the preloaded events for consistency, or throw an EntityConflictError to signal the duplicate.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment explaining the idempotency semantics. The early return is intentional — replay is deterministic so concurrent callers proceeding with the same run is safe. Skipping preloaded events here is acceptable since this is a rare race-condition path and the runtime falls back to getAllWorkflowRunEvents().

}

const [runValue] = await drizzle
.update(Schema.runs)
.set({
Expand Down Expand Up @@ -1135,29 +1160,66 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
}
}

// Strip eventData from run_started — it belongs on run_created only.
const storedEventData =
data.eventType === 'run_started'
? undefined
: 'eventData' in data
? data.eventData
: undefined;

const [value] = await drizzle
.insert(events)
.values({
runId: effectiveRunId,
eventId,
correlationId: data.correlationId,
eventType: data.eventType,
eventData: 'eventData' in data ? data.eventData : undefined,
eventData: storedEventData,
specVersion: effectiveSpecVersion,
})
.returning({ createdAt: events.createdAt });
if (!value) {
throw new EntityConflictError(`Event ${eventId} could not be created`);
}
const result = { ...data, ...value, runId: effectiveRunId, eventId };
const result = {
...data,
...value,
runId: effectiveRunId,
eventId,
...(storedEventData !== undefined
? { eventData: storedEventData }
: {}),
};
if (data.eventType === 'run_started') {
delete (result as any).eventData;
}
const parsed = EventSchema.parse(result);
const resolveData = params?.resolveData ?? 'all';

// For run_started: include all events so the runtime can skip
// the initial events.list call and reduce TTFB.
let allEvents: Event[] | undefined;
if (data.eventType === 'run_started' && run) {
const eventRows = await drizzle
.select()
.from(Schema.events)
.where(eq(Schema.events.runId, effectiveRunId))
.orderBy(Schema.events.eventId);
allEvents = eventRows.map((e) => {
e.eventData ||= e.eventDataJson;
const parsed = EventSchema.parse(compact(e));
return stripEventDataRefs(parsed, resolveData);
});
}

return {
event: stripEventDataRefs(parsed, resolveData),
run,
step,
hook,
wait,
events: allEvents,
};
},
async get(
Expand Down
6 changes: 6 additions & 0 deletions packages/world/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ export interface EventResult {
hook?: import('./hooks.js').Hook;
/** The wait entity (for wait_created/wait_completed events) */
wait?: import('./waits.js').Wait;
/**
* All events up to this point, with data resolved. When populated
* on a run_started response, the runtime uses these to skip the
* initial events.list call and reduce TTFB.
*/
events?: Event[];
}

export interface GetEventParams {
Expand Down
Loading