Skip to content
1 change: 1 addition & 0 deletions docs/content/docs/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"how-it-works",
"observability",
"ai",
"recipes",
"deploying",
"errors",
"api-reference"
Expand Down
363 changes: 363 additions & 0 deletions docs/content/docs/recipes/data-ingestion.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
---
title: Data Ingestion & File Processing
description: Build workflows that process file uploads, ingest events, and handle incoming data with built-in durability.
type: guide
summary: Process files, ingest events, and extract data with retry-safe, durable workflows.
prerequisites:
- /docs/foundations/workflows-and-steps
- /docs/foundations/streaming
related:
- /docs/foundations/errors-and-retries
- /docs/foundations/common-patterns
- /docs/api-reference/workflow/create-webhook
---

Many teams use workflows to process incoming data: file uploads, event streams, emails, and more. Workflows make this durable - if processing fails halfway through a 10,000-row CSV import, it picks up where it left off rather than restarting from scratch.

This guide covers the most common data ingestion patterns, starting simple and building up to production-ready examples.

## File Upload Processing

The simplest data ingestion pattern receives a file, validates it, processes it, and stores the result. Each operation is a step, so a failure in any stage retries only that stage.

```typescript title="workflows/process-file.ts" lineNumbers
import { getWritable } from "workflow";

type FileResult = {
rowCount: number;
errors: string[];
};

async function validateFile(fileUrl: string) {
"use step";

const response = await fetch(fileUrl);
const content = await response.text();
const rows = content.split("\n").filter(Boolean);

if (rows.length === 0) {
throw new Error("File is empty");
}

return { content, rowCount: rows.length };
}

async function transformRows(content: string) {
"use step";

const rows = content.split("\n").filter(Boolean);
const transformed = rows.map((row) => {
const fields = row.split(",");
return fields.map((f) => f.trim().toLowerCase()).join(",");
});

return transformed;
}

async function storeResults(rows: string[]) { // [!code highlight]
"use step"; // [!code highlight]

// Write to your database or storage service
await fetch("https://api.example.com/import", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ rows }),
});

return { rowCount: rows.length };
}

export async function processFileWorkflow(fileUrl: string) { // [!code highlight]
"use workflow"; // [!code highlight]

const { content } = await validateFile(fileUrl); // [!code highlight]
const transformed = await transformRows(content); // [!code highlight]
const result = await storeResults(transformed); // [!code highlight]

return result;
}
```

If `storeResults` fails after `validateFile` and `transformRows` succeed, only the store step retries. The validated and transformed data is already recorded in the event log.

### Streaming Large Files

For files too large to hold in memory, pass streams between steps instead of loading the full content. See the [streaming foundation guide](/docs/foundations/streaming) for more on how streams work.

```typescript title="workflows/process-large-file.ts" lineNumbers
export async function processLargeFileWorkflow(fileUrl: string) {
"use workflow";

const rawStream = await downloadFile(fileUrl); // [!code highlight]
const processed = await transformStream(rawStream); // [!code highlight]
await uploadResult(processed); // [!code highlight]
}

async function downloadFile(
url: string
): Promise<ReadableStream<Uint8Array>> {
"use step";

const response = await fetch(url);
return response.body!;
}

async function transformStream(
input: ReadableStream<Uint8Array>
): Promise<ReadableStream<Uint8Array>> {
"use step";

return input.pipeThrough(
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
// Process each chunk without loading the full file
controller.enqueue(chunk);
},
})
);
}

async function uploadResult(stream: ReadableStream<Uint8Array>) {
"use step";

await fetch("https://storage.example.com/upload", {
method: "POST",
body: stream,
});
}
```

## Event Ingestion Pipeline

Use [`createWebhook()`](/docs/api-reference/workflow/create-webhook) to receive events from external systems. The workflow stays alive, waiting for each event and processing it durably.

```typescript title="workflows/ingest-events.ts" lineNumbers
import { createWebhook } from "workflow";

type IngestEvent = {
source: string;
type: string;
payload: Record<string, unknown>;
};

async function enrichEvent(event: IngestEvent) {
"use step";

const enriched = {
...event,
receivedAt: new Date().toISOString(),
normalized: true,
};

return enriched;
}

async function writeToStore(event: Record<string, unknown>) {
"use step";

await fetch("https://api.example.com/events", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
}

export async function ingestEventsWorkflow(batchSize: number) { // [!code highlight]
"use workflow"; // [!code highlight]

for (let i = 0; i < batchSize; i++) { // [!code highlight]
const webhook = createWebhook<IngestEvent>(); // [!code highlight]
const request = await webhook; // [!code highlight]
const event = await request.json(); // [!code highlight]

const enriched = await enrichEvent(event);
await writeToStore(enriched);
}
}
```

Each event is processed as a pair of durable steps. If enrichment succeeds but writing fails, only the write retries - the enriched data is preserved.

<Callout type="info">
For continuous event ingestion without a fixed batch size, consider using [`createHook()`](/docs/api-reference/workflow/create-hook) with a `for await` loop. Hooks allow the workflow to receive multiple events on the same endpoint.
</Callout>

## Email Processing

Email processing is a natural fit for workflows. Parse the email in one step, extract structured data in another, and route based on content. Each step retries independently if it fails.

```typescript title="workflows/process-email.ts" lineNumbers
type ParsedEmail = {
from: string;
subject: string;
body: string;
hasAttachment: boolean;
};

async function parseEmail(rawEmail: string): Promise<ParsedEmail> {
"use step";

// Parse raw email into structured data
const lines = rawEmail.split("\n");
const from = lines.find((l) => l.startsWith("From:"))?.slice(5).trim() ?? "";
const subject =
lines.find((l) => l.startsWith("Subject:"))?.slice(8).trim() ?? "";
const bodyStart = lines.indexOf("") + 1;
const body = lines.slice(bodyStart).join("\n");

return {
from,
subject,
body,
hasAttachment: rawEmail.includes("Content-Disposition: attachment"),
};
}

async function classifyEmail(
parsed: ParsedEmail
): Promise<"support" | "billing" | "general"> {
"use step";

// Classify based on subject and body content
const text = `${parsed.subject} ${parsed.body}`.toLowerCase();

if (text.includes("invoice") || text.includes("payment")) return "billing";
if (text.includes("help") || text.includes("issue")) return "support";
return "general";
}

async function routeEmail(parsed: ParsedEmail, category: string) { // [!code highlight]
"use step"; // [!code highlight]

await fetch("https://api.example.com/tickets", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
from: parsed.from,
subject: parsed.subject,
body: parsed.body,
category,
}),
});
}

export async function processEmailWorkflow(rawEmail: string) { // [!code highlight]
"use workflow"; // [!code highlight]

const parsed = await parseEmail(rawEmail); // [!code highlight]
const category = await classifyEmail(parsed); // [!code highlight]
await routeEmail(parsed, category); // [!code highlight]

return { from: parsed.from, category };
}
```

## Resilient Data Extraction

When extracting data from external APIs, rate limits and transient failures are common. Use [`RetryableError`](/docs/api-reference/workflow/retryable-error) to customize retry delays and [`FatalError`](/docs/api-reference/workflow/fatal-error) to stop retrying on permanent failures.

```typescript title="workflows/extract-prices.ts" lineNumbers
import { FatalError, RetryableError, getStepMetadata } from "workflow";

type PriceData = {
symbol: string;
price: number;
timestamp: string;
};

async function fetchPrice(symbol: string): Promise<PriceData> { // [!code highlight]
"use step"; // [!code highlight]

const metadata = getStepMetadata(); // [!code highlight]
const response = await fetch(
`https://api.example.com/prices/${symbol}`
);

if (response.status === 429) {
// Rate limited - back off exponentially // [!code highlight]
throw new RetryableError("Rate limited", { // [!code highlight]
retryAfter: metadata.attempt ** 2 * 1000, // [!code highlight]
}); // [!code highlight]
}

if (response.status === 404) {
// Symbol doesn't exist - no point retrying
throw new FatalError(`Unknown symbol: ${symbol}`); // [!code highlight]
}

if (!response.ok) {
// Server error - default retry behavior
throw new Error(`API error: ${response.status}`);
}

return response.json() as Promise<PriceData>;
}

async function storePrices(prices: PriceData[]) {
"use step";

await fetch("https://api.example.com/prices/bulk", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prices }),
});
}

export async function extractPricesWorkflow(symbols: string[]) {
"use workflow";

const prices: PriceData[] = [];

for (const symbol of symbols) {
const price = await fetchPrice(symbol);
prices.push(price);
}

await storePrices(prices);
return { count: prices.length };
}
```

Each `fetchPrice` call is an independent step. If the API rate-limits the third symbol, the first two are already recorded and won't re-execute. The third step retries with exponential backoff until it succeeds or exhausts its retry budget.

<Callout type="info">
The default retry limit is 3. Set `fetchPrice.maxRetries = 10` after the function declaration to allow more attempts for flaky APIs. See [Errors and Retries](/docs/foundations/errors-and-retries) for the full retry API.
</Callout>

## Batch Processing with Controlled Concurrency

When processing a large array of items, you may want to limit how many run concurrently to avoid overwhelming downstream services. Chunk the array and process each batch with `Promise.all`.

```typescript title="workflows/batch-import.ts" lineNumbers
declare function processRecord(record: string): Promise<{ id: string }>; // @setup

export async function batchImportWorkflow(records: string[]) { // [!code highlight]
"use workflow"; // [!code highlight]

const batchSize = 5;
const results: Array<{ id: string }> = [];

for (let i = 0; i < records.length; i += batchSize) { // [!code highlight]
const batch = records.slice(i, i + batchSize); // [!code highlight]
const batchResults = await Promise.all( // [!code highlight]
batch.map((record) => processRecord(record)) // [!code highlight]
); // [!code highlight]
results.push(...batchResults);
}

return { processed: results.length };
}
```

Each batch of 5 items runs in parallel. The next batch starts only after the current one completes. If any step in a batch fails, it retries independently without affecting the other items in the batch.

This pattern works well for database imports, API migrations, and any scenario where you need throughput without overwhelming a target system.

## Related Documentation

- [Streaming](/docs/foundations/streaming) - Stream data between steps and to clients
- [Errors and Retries](/docs/foundations/errors-and-retries) - Customize retry behavior with `FatalError` and `RetryableError`
- [Common Patterns](/docs/foundations/common-patterns) - Sequential, parallel, timeout, and composition patterns
- [`createWebhook()` API Reference](/docs/api-reference/workflow/create-webhook) - Receive external events in workflows
- [`getStepMetadata()` API Reference](/docs/api-reference/workflow/get-step-metadata) - Access step metadata including attempt count
- [Serialization](/docs/foundations/serialization) - Understand which types can be passed between functions
17 changes: 17 additions & 0 deletions docs/content/docs/recipes/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: Recipes
description: Production-ready patterns and guides for common workflow use cases.
type: overview
summary: Apply proven patterns to build production workflows for data processing, integrations, and more.
related:
- /docs/foundations/common-patterns
- /docs/foundations/workflows-and-steps
---

These recipes provide complete, production-ready patterns for common workflow use cases. Each guide walks through a real-world scenario with working code examples you can adapt to your own projects.

<Cards>
<Card title="Data Ingestion & File Processing" href="/docs/recipes/data-ingestion">
Build workflows that process file uploads, ingest events, and handle incoming data with built-in durability.
</Card>
</Cards>
4 changes: 4 additions & 0 deletions docs/content/docs/recipes/meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"title": "Recipes",
"pages": ["index", "data-ingestion"]
}
Loading
Loading