diff --git a/.changeset/strong-walls-stand.md b/.changeset/strong-walls-stand.md new file mode 100644 index 0000000000..a845151cc8 --- /dev/null +++ b/.changeset/strong-walls-stand.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/.github/workflows/benchmark-community-world.yml b/.github/workflows/benchmark-community-world.yml index 8cfbeab1bc..5bc3fd7e08 100644 --- a/.github/workflows/benchmark-community-world.yml +++ b/.github/workflows/benchmark-community-world.yml @@ -54,6 +54,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - name: Checkout Repo diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 8a28fa14c8..84d6f2e866 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -140,6 +140,7 @@ jobs: path: | node_modules packages/*/dist + packages/core/src/version.ts retention-days: 1 # Phase 2a: Local benchmarks (no postgres) @@ -157,6 +158,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - uses: actions/checkout@v4 @@ -251,6 +253,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' WORKFLOW_TARGET_WORLD: "@workflow/world-postgres" WORKFLOW_POSTGRES_URL: "postgres://world:world@localhost:5432/world" @@ -386,6 +389,7 @@ jobs: env: DEPLOYMENT_URL: ${{ steps.waitForDeployment.outputs.deployment-url }} APP_NAME: ${{ matrix.app.name }} + VERCEL_DEPLOYMENT_ID: ${{ steps.waitForDeployment.outputs.deployment-id }} WORKFLOW_VERCEL_ENV: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }} WORKFLOW_VERCEL_AUTH_TOKEN: ${{ secrets.VERCEL_LABS_TOKEN }} WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a06288c869..ef7f5a0362 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -215,6 +215,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - name: Checkout Repo uses: actions/checkout@v4 @@ -297,6 +298,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - name: Checkout Repo @@ -360,6 +362,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - name: Checkout Repo @@ -441,6 +444,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' WORKFLOW_TARGET_WORLD: "@workflow/world-postgres" WORKFLOW_POSTGRES_URL: "postgres://world:world@localhost:5432/world" @@ -507,6 +511,7 @@ jobs: env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' steps: - name: Checkout Repo diff --git a/packages/core/e2e/bench.bench.ts b/packages/core/e2e/bench.bench.ts index 487a64ae10..26463c065b 100644 --- a/packages/core/e2e/bench.bench.ts +++ b/packages/core/e2e/bench.bench.ts @@ -1,15 +1,118 @@ -import { withResolvers } from '@workflow/utils'; import fs from 'fs'; import path from 'path'; import { bench, describe } from 'vitest'; -import { dehydrateWorkflowArguments } from '../src/serialization'; -import { getProtectionBypassHeaders } from './utils'; +import type { Run } from '../src/runtime'; +import { start } from '../src/runtime'; +import { + getProtectionBypassHeaders, + getWorkbenchAppPath, + isLocalDeployment, +} from './utils'; const deploymentUrl = process.env.DEPLOYMENT_URL; if (!deploymentUrl) { throw new Error('`DEPLOYMENT_URL` environment variable is not set'); } +console.log('[bench] deploymentUrl:', deploymentUrl); +console.log('[bench] isLocalDeployment:', isLocalDeployment()); +console.log('[bench] WORKFLOW_VERCEL_ENV:', process.env.WORKFLOW_VERCEL_ENV); +console.log('[bench] VERCEL_DEPLOYMENT_ID:', process.env.VERCEL_DEPLOYMENT_ID); +console.log( + '[bench] WORKFLOW_TARGET_WORLD:', + process.env.WORKFLOW_TARGET_WORLD +); + +// Configure the World for the bench runner process (same as e2e tests) +if (isLocalDeployment()) { + process.env.WORKFLOW_LOCAL_BASE_URL = deploymentUrl; + const appPath = getWorkbenchAppPath(); + const appName = process.env.APP_NAME!; + const isNextJs = appName.includes('nextjs') || appName.includes('next-'); + const dataDirName = isNextJs ? '.next/workflow-data' : '.workflow-data'; + process.env.WORKFLOW_LOCAL_DATA_DIR = path.join(appPath, dataDirName); + console.log( + '[bench] configured local world, dataDir:', + process.env.WORKFLOW_LOCAL_DATA_DIR + ); +} else if (process.env.WORKFLOW_VERCEL_ENV) { + if (!process.env.VERCEL_DEPLOYMENT_ID) { + throw new Error( + 'VERCEL_DEPLOYMENT_ID is required for Vercel benchmarks but is not set' + ); + } + console.log('[bench] configured for Vercel world'); +} else { + console.log('[bench] no special world configuration'); +} + +// Manifest type and helpers (same as e2e tests) +interface WorkflowManifest { + version: string; + workflows: Record< + string, + Record + >; + steps: Record>; +} + +let cachedManifest: WorkflowManifest | null = null; + +async function fetchManifest(): Promise { + if (cachedManifest) return cachedManifest; + const url = new URL('/.well-known/workflow/v1/manifest.json', deploymentUrl); + console.log('[bench] fetching manifest from:', url.toString()); + const res = await fetch(url, { + headers: getProtectionBypassHeaders(), + signal: AbortSignal.timeout(30_000), + redirect: 'follow', + }); + console.log('[bench] manifest response status:', res.status, 'url:', res.url); + if (!res.ok) { + const text = await res.text(); + throw new Error(`Failed to fetch manifest: ${res.status} ${text}`); + } + cachedManifest = (await res.json()) as WorkflowManifest; + console.log( + '[bench] manifest loaded, workflows:', + Object.keys(cachedManifest.workflows).join(', ') + ); + return cachedManifest; +} + +async function getWorkflowMetadata( + workflowFile: string, + workflowFn: string +): Promise<{ workflowId: string }> { + const manifest = await fetchManifest(); + for (const [manifestFile, functions] of Object.entries(manifest.workflows)) { + if ( + manifestFile.endsWith(workflowFile) || + workflowFile.endsWith(manifestFile) + ) { + const entry = functions[workflowFn]; + if (entry) return entry; + } + } + const fileWithoutExt = workflowFile.replace(/\.tsx?$/, ''); + for (const [manifestFile, functions] of Object.entries(manifest.workflows)) { + const manifestFileWithoutExt = manifestFile.replace(/\.tsx?$/, ''); + if ( + manifestFileWithoutExt.endsWith(fileWithoutExt) || + fileWithoutExt.endsWith(manifestFileWithoutExt) + ) { + const entry = functions[workflowFn]; + if (entry) return entry; + } + } + throw new Error( + `Workflow "${workflowFn}" not found in manifest for file "${workflowFile}"` + ); +} + +const benchWf = (fn: string) => + getWorkflowMetadata('workflows/97_bench.ts', fn); + // Store workflow execution times for each benchmark const workflowTimings: Record< string, @@ -30,106 +133,42 @@ const bufferedTimings: Map< { run: any; extra?: { firstByteTimeMs?: number; slurpTimeMs?: number } }[] > = new Map(); -async function triggerWorkflow( - workflow: string | { workflowFile: string; workflowFn: string }, - args: any[] -): Promise<{ runId: string }> { - const url = new URL('/api/trigger', deploymentUrl); - const workflowFn = - typeof workflow === 'string' ? workflow : workflow.workflowFn; - const workflowFile = - typeof workflow === 'string' - ? 'workflows/97_bench.ts' - : workflow.workflowFile; - - url.searchParams.set('workflowFile', workflowFile); - url.searchParams.set('workflowFn', workflowFn); - - const ops: Promise[] = []; - const { promise: runIdPromise, resolve: resolveRunId } = - withResolvers(); - const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise); - - const res = await fetch(url, { - method: 'POST', - headers: { - ...getProtectionBypassHeaders(), - 'Content-Type': 'application/octet-stream', - }, - body: dehydratedArgs.buffer as BodyInit, - }); - if (!res.ok) { - throw new Error( - `Failed to trigger workflow: ${res.url} ${ - res.status - }: ${await res.text()}` - ); - } - const run = await res.json(); - resolveRunId(run.runId); - - // Resolve and wait for any stream operations - await Promise.all(ops); - - return run; +/** + * Await run.returnValue with a timeout to prevent benchmarks from hanging. + */ +async function awaitReturnValue( + run: Run, + timeoutMs = 120_000 +): Promise { + const timeout = new Promise((_, reject) => + setTimeout( + () => + reject( + new Error( + `run.returnValue timed out after ${timeoutMs}ms for run ${run.runId}` + ) + ), + timeoutMs + ) + ); + return Promise.race([run.returnValue, timeout]); } -async function getWorkflowReturnValue( - runId: string -): Promise<{ run: any; value: any }> { - const MAX_UNEXPECTED_CONTENT_RETRIES = 3; - let unexpectedContentRetries = 0; - - // We need to poll the GET endpoint until the workflow run is completed. - while (true) { - const url = new URL('/api/trigger', deploymentUrl); - url.searchParams.set('runId', runId); - - const res = await fetch(url, { headers: getProtectionBypassHeaders() }); - - if (res.status === 202) { - // Workflow run is still running, so we need to wait and poll again - await new Promise((resolve) => setTimeout(resolve, 100)); - continue; - } - - // Extract run metadata from headers - const run = { - runId, - createdAt: res.headers.get('X-Workflow-Run-Created-At'), - startedAt: res.headers.get('X-Workflow-Run-Started-At'), - completedAt: res.headers.get('X-Workflow-Run-Completed-At'), - }; - - const contentType = res.headers.get('Content-Type'); - - if (contentType?.includes('application/json')) { - return { run, value: await res.json() }; - } - - if (contentType?.includes('application/octet-stream')) { - return { run, value: res.body }; - } - - // Unexpected content type - log details and retry - unexpectedContentRetries++; - const responseText = await res.text().catch(() => ''); - console.warn( - `[bench] Unexpected content type for runId=${runId} (attempt ${unexpectedContentRetries}/${MAX_UNEXPECTED_CONTENT_RETRIES}):\n` + - ` Status: ${res.status}\n` + - ` Content-Type: ${contentType}\n` + - ` Response: ${responseText.slice(0, 500)}${responseText.length > 500 ? '...' : ''}` - ); - - if (unexpectedContentRetries >= MAX_UNEXPECTED_CONTENT_RETRIES) { - throw new Error( - `Unexpected content type after ${MAX_UNEXPECTED_CONTENT_RETRIES} retries: ${contentType} (status=${res.status})` - ); - } - - // Wait before retrying - await new Promise((resolve) => setTimeout(resolve, 500)); - } +/** + * Collect run timing metadata from a completed run. + */ +async function getRunTimings(run: Run) { + const [createdAt, startedAt, completedAt] = await Promise.all([ + run.createdAt, + run.startedAt, + run.completedAt, + ]); + return { + runId: run.runId, + createdAt: createdAt?.toISOString(), + startedAt: startedAt?.toISOString(), + completedAt: completedAt?.toISOString(), + }; } function getTimingOutputPath() { @@ -289,9 +328,16 @@ describe('Workflow Performance Benchmarks', () => { bench( 'workflow with no steps', async () => { - const { runId } = await triggerWorkflow('noStepsWorkflow', [42]); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('workflow with no steps', run); + console.log('[bench] resolving workflow metadata...'); + const wf = await benchWf('noStepsWorkflow'); + console.log('[bench] calling start() with workflowId:', wf.workflowId); + const run = await start(wf, [42]); + console.log('[bench] start() returned, runId:', run.runId); + console.log('[bench] awaiting returnValue...'); + await awaitReturnValue(run); + console.log('[bench] returnValue resolved'); + const timings = await getRunTimings(run); + stageTiming('workflow with no steps', timings); }, { time: 5000, warmupIterations: 1, teardown } ); @@ -299,9 +345,10 @@ describe('Workflow Performance Benchmarks', () => { bench( 'workflow with 1 step', async () => { - const { runId } = await triggerWorkflow('oneStepWorkflow', [100]); - const { run } = await getWorkflowReturnValue(runId); - stageTiming('workflow with 1 step', run); + const run = await start(await benchWf('oneStepWorkflow'), [100]); + await awaitReturnValue(run); + const timings = await getRunTimings(run); + stageTiming('workflow with 1 step', timings); }, { time: 5000, warmupIterations: 1, teardown } ); @@ -324,11 +371,12 @@ describe('Workflow Performance Benchmarks', () => { benchFn( name, async () => { - const { runId } = await triggerWorkflow('sequentialStepsWorkflow', [ + const run = await start(await benchWf('sequentialStepsWorkflow'), [ count, ]); - const { run } = await getWorkflowReturnValue(runId); - stageTiming(name, run); + await awaitReturnValue(run); + const timings = await getRunTimings(run); + stageTiming(name, timings); }, { time, iterations: 1, warmupIterations: 0, teardown } ); @@ -337,8 +385,9 @@ describe('Workflow Performance Benchmarks', () => { bench( 'workflow with stream', async () => { - const { runId } = await triggerWorkflow('streamWorkflow', []); - const { run, value } = await getWorkflowReturnValue(runId); + const run = await start(await benchWf('streamWorkflow'), []); + const value = await awaitReturnValue(run); + const timings = await getRunTimings(run); // Consume the entire stream and track: // - firstByteTimeMs: time from workflow start to first byte // - slurpTimeMs: time from first byte to stream completion @@ -350,8 +399,8 @@ describe('Workflow Performance Benchmarks', () => { let firstByteTimestamp: number | undefined; while (true) { const { done } = await reader.read(); - if (isFirstChunk && !done && run.startedAt) { - const startedAt = new Date(run.startedAt).getTime(); + if (isFirstChunk && !done && timings.startedAt) { + const startedAt = new Date(timings.startedAt).getTime(); firstByteTimestamp = Date.now(); firstByteTimeMs = firstByteTimestamp - startedAt; isFirstChunk = false; @@ -364,7 +413,7 @@ describe('Workflow Performance Benchmarks', () => { } } } - stageTiming('workflow with stream', run, { + stageTiming('workflow with stream', timings, { firstByteTimeMs, slurpTimeMs, }); @@ -396,9 +445,10 @@ describe('Workflow Performance Benchmarks', () => { benchFn( name, async () => { - const { runId } = await triggerWorkflow(workflow, [count]); - const { run } = await getWorkflowReturnValue(runId); - stageTiming(name, run); + const run = await start(await benchWf(workflow), [count]); + await awaitReturnValue(run); + const timings = await getRunTimings(run); + stageTiming(name, timings); }, { time, iterations: 1, warmupIterations: 0, teardown } ); diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 086e61b5ee..1a1cda759d 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1,16 +1,30 @@ -import { withResolvers } from '@workflow/utils'; +import { WorkflowRunFailedError } from '@workflow/errors'; import fs from 'fs'; import path from 'path'; -import { afterAll, assert, describe, expect, test } from 'vitest'; -import { dehydrateWorkflowArguments } from '../src/serialization'; +import { afterAll, assert, beforeAll, describe, expect, test } from 'vitest'; +import type { Run } from '../src/runtime'; +import { getRun, start } from '../src/runtime'; import { cliHealthJson, cliInspectJson, getProtectionBypassHeaders, + getWorkbenchAppPath, hasStepSourceMaps, hasWorkflowSourceMaps, + isLocalDeployment, } from './utils'; +// Manifest type matching the structure from BaseBuilder.createManifest() +interface WorkflowManifest { + version: string; + workflows: Record< + string, + Record + >; + steps: Record>; + classes?: Record>; +} + const deploymentUrl = process.env.DEPLOYMENT_URL; if (!deploymentUrl) { throw new Error('`DEPLOYMENT_URL` environment variable is not set'); @@ -47,14 +61,95 @@ function writeE2EMetadata() { fs.writeFileSync(getE2EMetadataPath(), JSON.stringify(metadata, null, 2)); } -async function triggerWorkflow( +// Cached manifest fetched from the deployment +let cachedManifest: WorkflowManifest | null = null; + +/** + * Fetches the workflow manifest from the deployment URL. + * The manifest is served at /.well-known/workflow/v1/manifest.json by each + * workbench app when WORKFLOW_PUBLIC_MANIFEST=1 is set. + */ +async function fetchManifest(): Promise { + if (cachedManifest) return cachedManifest; + + const url = new URL('/.well-known/workflow/v1/manifest.json', deploymentUrl); + const res = await fetch(url, { + headers: getProtectionBypassHeaders(), + }); + if (!res.ok) { + throw new Error( + `Failed to fetch manifest from ${url}: ${res.status} ${await res.text()}` + ); + } + cachedManifest = (await res.json()) as WorkflowManifest; + return cachedManifest; +} + +/** + * Looks up the workflow metadata from the manifest for a given workflow file and function name. + * Returns an object that can be passed directly to `start()`. + * + * The manifest contains the exact IDs produced by the SWC transform during the build, + * which handles symlink resolution and path normalization correctly. + */ +async function getWorkflowMetadata( + workflowFile: string, + workflowFn: string +): Promise<{ workflowId: string }> { + const manifest = await fetchManifest(); + + // The manifest keys are relative file paths as seen by the builder. + // Due to symlinks, the key may differ from the workflowFile we pass + // (e.g., "example/workflows/99_e2e.ts" vs "workflows/99_e2e.ts"). + // Search all files for the matching function name and workflow file suffix. + for (const [manifestFile, functions] of Object.entries(manifest.workflows)) { + if ( + manifestFile.endsWith(workflowFile) || + workflowFile.endsWith(manifestFile) + ) { + const entry = functions[workflowFn]; + if (entry) { + return entry; + } + } + } + + // If suffix matching didn't find it, try stripping the extension for matching + const fileWithoutExt = workflowFile.replace(/\.tsx?$/, ''); + for (const [manifestFile, functions] of Object.entries(manifest.workflows)) { + const manifestFileWithoutExt = manifestFile.replace(/\.tsx?$/, ''); + if ( + manifestFileWithoutExt.endsWith(fileWithoutExt) || + fileWithoutExt.endsWith(manifestFileWithoutExt) + ) { + const entry = functions[workflowFn]; + if (entry) { + return entry; + } + } + } + + throw new Error( + `Workflow "${workflowFn}" not found in manifest for file "${workflowFile}". ` + + `Available files: ${Object.keys(manifest.workflows).join(', ')}` + ); +} + +/** + * Shorthand for looking up workflow metadata from workflows/99_e2e.ts. + * Usage: `const run = await start(await e2e('addTenWorkflow'), [123]);` + */ +const e2e = (fn: string) => getWorkflowMetadata('workflows/99_e2e.ts', fn); + +/** + * Triggers a workflow via HTTP POST. Used only for Pages Router tests + * that specifically need to validate the HTTP trigger endpoint. + */ +async function startWorkflowViaHttp( workflow: string | { workflowFile: string; workflowFn: string }, args: any[], - options?: { usePagesRouter?: boolean } -): Promise<{ runId: string }> { - const endpoint = options?.usePagesRouter - ? '/api/trigger-pages' - : '/api/trigger'; + endpoint: string +): Promise> { const url = new URL(endpoint, deploymentUrl); const workflowFn = typeof workflow === 'string' ? workflow : workflow.workflowFn; @@ -66,18 +161,18 @@ async function triggerWorkflow( url.searchParams.set('workflowFile', workflowFile); url.searchParams.set('workflowFn', workflowFn); - const ops: Promise[] = []; - const { promise: runIdPromise, resolve: resolveRunId } = - withResolvers(); - const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise); + // Note: when args is empty, the server may use default args (e.g., [42]). + // This is acceptable for Pages Router tests since the workflows called + // with empty args don't use their arguments. + if (args.length > 0) { + url.searchParams.set('args', args.map(String).join(',')); + } const res = await fetch(url, { method: 'POST', headers: { ...getProtectionBypassHeaders(), - 'Content-Type': 'application/octet-stream', }, - body: dehydratedArgs.buffer as BodyInit, }); if (!res.ok) { throw new Error( @@ -86,56 +181,36 @@ async function triggerWorkflow( }: ${await res.text()}` ); } - const run = await res.json(); - resolveRunId(run.runId); - - // Collect runId for observability links (Vercel world only) - if (process.env.WORKFLOW_VERCEL_ENV) { - const testName = expect.getState().currentTestName || workflowFn; - collectedRunIds.push({ - testName, - runId: run.runId, - timestamp: new Date().toISOString(), - }); - } - - // Resolve and wait for any stream operations - await Promise.all(ops); + const result = await res.json(); + const run = getRun(result.runId); return run; } -async function getWorkflowReturnValue(runId: string) { - // We need to poll the GET endpoint until the workflow run is completed. - // TODO: make this more efficient when we add subscription support. - while (true) { - const url = new URL('/api/trigger', deploymentUrl); - url.searchParams.set('runId', runId); - - const res = await fetch(url, { headers: getProtectionBypassHeaders() }); - - if (res.status === 202) { - // Workflow run is still running, so we need to wait and poll again - await new Promise((resolve) => setTimeout(resolve, 5_000)); - continue; - } - const contentType = res.headers.get('Content-Type'); - - if (contentType?.includes('application/json')) { - return await res.json(); - } - - if (contentType?.includes('application/octet-stream')) { - return res.body; - } - - throw new Error(`Unexpected content type: ${contentType}`); - } -} - // NOTE: Temporarily disabling concurrent tests to avoid flakiness. // TODO: Re-enable concurrent tests after conf when we have more time to investigate. describe('e2e', () => { + // Configure the World for the test runner process so that start() and + // run.returnValue can communicate with the same backend as the workbench app. + beforeAll(async () => { + if (isLocalDeployment()) { + // Set base URL so the local queue can reach the running workbench app + process.env.WORKFLOW_LOCAL_BASE_URL = deploymentUrl; + + // Set the data directory to match the workbench app's data directory. + // We must set this explicitly (not discover it) because the data dir + // may not exist yet when the test starts — the app creates it on first use. + // Next.js uses .next/workflow-data, all other frameworks use .workflow-data. + const appPath = getWorkbenchAppPath(); + const appName = process.env.APP_NAME!; + const isNextJs = appName.includes('nextjs') || appName.includes('next-'); + const dataDirName = isNextJs ? '.next/workflow-data' : '.workflow-data'; + process.env.WORKFLOW_LOCAL_DATA_DIR = path.join(appPath, dataDirName); + } + // For Vercel tests: WORKFLOW_VERCEL_AUTH_TOKEN, WORKFLOW_VERCEL_PROJECT, etc. are set by CI + // For Postgres tests: WORKFLOW_TARGET_WORLD and WORKFLOW_POSTGRES_URL are set by CI + }); + // Write E2E metadata file with runIds for observability links afterAll(() => { writeE2EMetadata(); @@ -151,8 +226,12 @@ describe('e2e', () => { workflowFn: 'addTenWorkflow', }, ])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => { - const run = await triggerWorkflow(workflow, [123]); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start( + await getWorkflowMetadata(workflow.workflowFile, workflow.workflowFn), + [123] + ); + + const returnValue = await run.returnValue; expect(returnValue).toBe(133); const { json } = await cliInspectJson(`runs ${run.runId} --withData`); @@ -186,55 +265,63 @@ describe('e2e', () => { test.skipIf(shouldSkipReactRenderTest)( 'should work with react rendering in step', async () => { - const run = await triggerWorkflow( - { - workflowFile: 'workflows/8_react_render.tsx', - workflowFn: 'reactWorkflow', - }, + const run = await start( + await getWorkflowMetadata( + 'workflows/8_react_render.tsx', + 'reactWorkflow' + ), [] ); - const returnValue = await getWorkflowReturnValue(run.runId); + + const returnValue = await run.returnValue; expect(returnValue).toBe('
hello world 2
'); } ); test('promiseAllWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('promiseAllWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('promiseAllWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toBe('ABC'); }); test('promiseRaceWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('promiseRaceWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('promiseRaceWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toBe('B'); }); test('promiseAnyWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('promiseAnyWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('promiseAnyWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toBe('B'); }); - test('readableStreamWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('readableStreamWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); - expect(returnValue).toBeInstanceOf(ReadableStream); - - const decoder = new TextDecoder(); - let contents = ''; - for await (const chunk of returnValue) { - const text = decoder.decode(chunk, { stream: true }); - contents += text; + // ReadableStream return values use the world's streaming infrastructure which + // requires in-process access. The local world's streamer uses an in-process EventEmitter + // that doesn't work cross-process (test runner ↔ workbench app). + test.skipIf(isLocalDeployment())( + 'readableStreamWorkflow', + { timeout: 60_000 }, + async () => { + const run = await start(await e2e('readableStreamWorkflow'), []); + const returnValue = await run.returnValue; + expect(returnValue).toBeInstanceOf(ReadableStream); + + const decoder = new TextDecoder(); + let contents = ''; + for await (const chunk of returnValue) { + const text = decoder.decode(chunk, { stream: true }); + contents += text; + } + expect(contents).toBe('0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'); } - expect(contents).toBe('0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'); - }); + ); test('hookWorkflow', { timeout: 60_000 }, async () => { const token = Math.random().toString(36).slice(2); const customData = Math.random().toString(36).slice(2); - const run = await triggerWorkflow('hookWorkflow', [token, customData]); + const run = await start(await e2e('hookWorkflow'), [token, customData]); // Wait a few seconds so that the webhook is registered. // TODO: make this more efficient when we add subscription support. @@ -282,7 +369,7 @@ describe('e2e', () => { body = await res.json(); expect(body.runId).toBe(run.runId); - const returnValue = await getWorkflowReturnValue(run.runId); + const returnValue = await run.returnValue; expect(returnValue).toBeInstanceOf(Array); expect(returnValue.length).toBe(3); expect(returnValue[0].message).toBe('one'); @@ -301,7 +388,7 @@ describe('e2e', () => { const token2 = Math.random().toString(36).slice(2); const token3 = Math.random().toString(36).slice(2); - const run = await triggerWorkflow('webhookWorkflow', [ + const run = await start(await e2e('webhookWorkflow'), [ token, token2, token3, @@ -359,7 +446,7 @@ describe('e2e', () => { const body3 = await res3.text(); expect(body3).toBe('Hello from webhook!'); - const returnValue = await getWorkflowReturnValue(run.runId); + const returnValue = await run.returnValue; expect(returnValue).toHaveLength(3); expect(returnValue[0].url).toBe( new URL( @@ -405,21 +492,21 @@ describe('e2e', () => { }); test('sleepingWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('sleepingWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('sleepingWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); }); test('nullByteWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('nullByteWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('nullByteWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toBe('null byte \0'); }); test('workflowAndStepMetadataWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('workflowAndStepMetadataWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('workflowAndStepMetadataWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toHaveProperty('workflowMetadata'); expect(returnValue).toHaveProperty('stepMetadata'); @@ -437,14 +524,12 @@ describe('e2e', () => { expect(returnValue.stepMetadata.workflowRunId).toBeUndefined(); // workflow context should have workflowStartedAt and stepMetadata shouldn't - expect(typeof returnValue.workflowMetadata.workflowStartedAt).toBe( - 'string' - ); - expect(typeof returnValue.innerWorkflowMetadata.workflowStartedAt).toBe( - 'string' - ); - expect(returnValue.innerWorkflowMetadata.workflowStartedAt).toBe( - returnValue.workflowMetadata.workflowStartedAt + // Note: workflowStartedAt may be a Date object (when using run.returnValue directly) + // or a string (when serialized through JSON via HTTP) + expect(returnValue.workflowMetadata.workflowStartedAt).toBeDefined(); + expect(returnValue.innerWorkflowMetadata.workflowStartedAt).toBeDefined(); + expect(String(returnValue.innerWorkflowMetadata.workflowStartedAt)).toBe( + String(returnValue.workflowMetadata.workflowStartedAt) ); expect(returnValue.stepMetadata.workflowStartedAt).toBeUndefined(); @@ -466,108 +551,88 @@ describe('e2e', () => { // Attempt should be atleast 1 expect(returnValue.stepMetadata.attempt).toBeGreaterThanOrEqual(1); - // stepStartedAt should be a Date - expect(typeof returnValue.stepMetadata.stepStartedAt).toBe('string'); - }); - - test('outputStreamWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('outputStreamWorkflow', []); - const stream = await fetch( - `${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=1`, - { headers: getProtectionBypassHeaders() } - ); - const namedStream = await fetch( - `${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=test`, - { headers: getProtectionBypassHeaders() } - ); - const textDecoderStream = new TextDecoderStream(); - stream.body?.pipeThrough(textDecoderStream); - const reader = textDecoderStream.readable.getReader(); - - const namedTextDecoderStream = new TextDecoderStream(); - namedStream.body?.pipeThrough(namedTextDecoderStream); - const namedReader = namedTextDecoderStream.readable.getReader(); - - const r1 = await reader.read(); - assert(r1.value); - const chunk1 = JSON.parse(r1.value); - const binaryData = Buffer.from(chunk1.data, 'base64'); - expect(binaryData.toString()).toEqual('Hello, world!'); - - const r1Named = await namedReader.read(); - assert(r1Named.value); - const chunk1Named = JSON.parse(r1Named.value); - const binaryDataNamed = Buffer.from(chunk1Named.data, 'base64'); - expect(binaryDataNamed.toString()).toEqual('Hello, named stream!'); - - const r2 = await reader.read(); - assert(r2.value); - const chunk2 = JSON.parse(r2.value); - expect(chunk2).toEqual({ foo: 'test' }); - - const r2Named = await namedReader.read(); - assert(r2Named.value); - const chunk2Named = JSON.parse(r2Named.value); - expect(chunk2Named).toEqual({ foo: 'bar' }); - - const r3 = await reader.read(); - expect(r3.done).toBe(true); - - const r3Named = await namedReader.read(); - expect(r3Named.done).toBe(true); - - const returnValue = await getWorkflowReturnValue(run.runId); - expect(returnValue).toEqual('done'); + // stepStartedAt should be a Date or date string + expect(returnValue.stepMetadata.stepStartedAt).toBeDefined(); }); - test( - 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', + // Output stream tests use run.getReadable() which requires in-process streaming + // infrastructure. The local world's streamer uses an EventEmitter that doesn't work + // cross-process (test runner ↔ workbench app). + test.skipIf(isLocalDeployment())( + 'outputStreamWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('outputStreamInsideStepWorkflow', []); - const stream = await fetch( - `${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=1`, - { headers: getProtectionBypassHeaders() } - ); - const namedStream = await fetch( - `${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=step-ns`, - { headers: getProtectionBypassHeaders() } + const run = await start(await e2e('outputStreamWorkflow'), []); + const reader = run.getReadable().getReader(); + const namedReader = run.getReadable({ namespace: 'test' }).getReader(); + + // First chunk from default stream: binary data + const r1 = await reader.read(); + assert(r1.value); + assert(r1.value instanceof Uint8Array); + expect(Buffer.from(r1.value).toString()).toEqual('Hello, world!'); + + // First chunk from named stream: binary data + const r1Named = await namedReader.read(); + assert(r1Named.value); + assert(r1Named.value instanceof Uint8Array); + expect(Buffer.from(r1Named.value).toString()).toEqual( + 'Hello, named stream!' ); - const textDecoderStream = new TextDecoderStream(); - stream.body?.pipeThrough(textDecoderStream); - const reader = textDecoderStream.readable.getReader(); - const namedTextDecoderStream = new TextDecoderStream(); - namedStream.body?.pipeThrough(namedTextDecoderStream); - const namedReader = namedTextDecoderStream.readable.getReader(); + // Second chunk from default stream: JSON object + const r2 = await reader.read(); + assert(r2.value); + expect(r2.value).toEqual({ foo: 'test' }); + + // Second chunk from named stream: JSON object + const r2Named = await namedReader.read(); + assert(r2Named.value); + expect(r2Named.value).toEqual({ foo: 'bar' }); + + // Streams should be closed + const r3 = await reader.read(); + expect(r3.done).toBe(true); + + const r3Named = await namedReader.read(); + expect(r3Named.done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toEqual('done'); + } + ); - // First message from default stream + test.skipIf(isLocalDeployment())( + 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', + { timeout: 60_000 }, + async () => { + const run = await start(await e2e('outputStreamInsideStepWorkflow'), []); + const reader = run.getReadable().getReader(); + const namedReader = run.getReadable({ namespace: 'step-ns' }).getReader(); + + // First message from default stream: binary data const r1 = await reader.read(); assert(r1.value); - const chunk1 = JSON.parse(r1.value); - const binaryData1 = Buffer.from(chunk1.data, 'base64'); - expect(binaryData1.toString()).toEqual('Hello from step!'); + assert(r1.value instanceof Uint8Array); + expect(Buffer.from(r1.value).toString()).toEqual('Hello from step!'); - // First message from named stream + // First message from named stream: JSON object const r1Named = await namedReader.read(); assert(r1Named.value); - const chunk1Named = JSON.parse(r1Named.value); - expect(chunk1Named).toEqual({ + expect(r1Named.value).toEqual({ message: 'Hello from named stream in step!', }); - // Second message from default stream + // Second message from default stream: binary data const r2 = await reader.read(); assert(r2.value); - const chunk2 = JSON.parse(r2.value); - const binaryData2 = Buffer.from(chunk2.data, 'base64'); - expect(binaryData2.toString()).toEqual('Second message'); + assert(r2.value instanceof Uint8Array); + expect(Buffer.from(r2.value).toString()).toEqual('Second message'); - // Second message from named stream + // Second message from named stream: JSON object const r2Named = await namedReader.read(); assert(r2Named.value); - const chunk2Named = JSON.parse(r2Named.value); - expect(chunk2Named).toEqual({ counter: 42 }); + expect(r2Named.value).toEqual({ counter: 42 }); // Verify streams are closed const r3 = await reader.read(); @@ -576,14 +641,14 @@ describe('e2e', () => { const r3Named = await namedReader.read(); expect(r3Named.done).toBe(true); - const returnValue = await getWorkflowReturnValue(run.runId); + const returnValue = await run.returnValue; expect(returnValue).toEqual('done'); } ); test('fetchWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('fetchWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('fetchWorkflow'), []); + const returnValue = await run.returnValue; expect(returnValue).toMatchObject({ userId: 1, id: 1, @@ -593,8 +658,8 @@ describe('e2e', () => { }); test('promiseRaceStressTestWorkflow', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('promiseRaceStressTestWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('promiseRaceStressTestWorkflow'), []); + const returnValue = await run.returnValue; // Completion order can vary across worlds and scheduling environments. expect([...returnValue].sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4]); }); @@ -607,22 +672,23 @@ describe('e2e', () => { 'nested function calls preserve message and stack trace', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorWorkflowNested', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorWorkflowNested'), []); + const error = await run.returnValue.catch((e: unknown) => e); - expect(result.name).toBe('WorkflowRunFailedError'); - expect(result.cause.message).toContain('Nested workflow error'); + expect(WorkflowRunFailedError.is(error)).toBe(true); + assert(WorkflowRunFailedError.is(error)); + expect(error.cause.message).toContain('Nested workflow error'); // Workflow source maps are not properly supported everywhere. Check the definition // of hasWorkflowSourceMaps() to see where they are supported if (hasWorkflowSourceMaps()) { // Stack shows call chain: errorNested1 -> errorNested2 -> errorNested3 - expect(result.cause.stack).toContain('errorNested1'); - expect(result.cause.stack).toContain('errorNested2'); - expect(result.cause.stack).toContain('errorNested3'); - expect(result.cause.stack).toContain('errorWorkflowNested'); - expect(result.cause.stack).toContain('99_e2e.ts'); - expect(result.cause.stack).not.toContain('evalmachine'); + expect(error.cause.stack).toContain('errorNested1'); + expect(error.cause.stack).toContain('errorNested2'); + expect(error.cause.stack).toContain('errorNested3'); + expect(error.cause.stack).toContain('errorWorkflowNested'); + expect(error.cause.stack).toContain('99_e2e.ts'); + expect(error.cause.stack).not.toContain('evalmachine'); } const { json: runData } = await cliInspectJson(`runs ${run.runId}`); @@ -634,22 +700,23 @@ describe('e2e', () => { 'cross-file imports preserve message and stack trace', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorWorkflowCrossFile', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorWorkflowCrossFile'), []); + const error = await run.returnValue.catch((e: unknown) => e); - expect(result.name).toBe('WorkflowRunFailedError'); - expect(result.cause.message).toContain( + expect(WorkflowRunFailedError.is(error)).toBe(true); + assert(WorkflowRunFailedError.is(error)); + expect(error.cause.message).toContain( 'Error from imported helper module' ); // Workflow source maps are not properly supported everywhere. Check the definition // of hasWorkflowSourceMaps() to see where they are supported if (hasWorkflowSourceMaps()) { - expect(result.cause.stack).toContain('throwError'); - expect(result.cause.stack).toContain('callThrower'); - expect(result.cause.stack).toContain('errorWorkflowCrossFile'); - expect(result.cause.stack).toContain('helpers.ts'); - expect(result.cause.stack).not.toContain('evalmachine'); + expect(error.cause.stack).toContain('throwError'); + expect(error.cause.stack).toContain('callThrower'); + expect(error.cause.stack).toContain('errorWorkflowCrossFile'); + expect(error.cause.stack).toContain('helpers.ts'); + expect(error.cause.stack).not.toContain('evalmachine'); } const { json: runData } = await cliInspectJson(`runs ${run.runId}`); @@ -663,8 +730,8 @@ describe('e2e', () => { 'basic step error preserves message and stack trace', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorStepBasic', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorStepBasic'), []); + const result = await run.returnValue; // Workflow catches the error and returns it expect(result.caught).toBe(true); @@ -713,8 +780,8 @@ describe('e2e', () => { 'cross-file step error preserves message and function names in stack', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorStepCrossFile', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorStepCrossFile'), []); + const result = await run.returnValue; // Workflow catches the error and returns message + stack expect(result.caught).toBe(true); @@ -768,8 +835,8 @@ describe('e2e', () => { 'regular Error retries until success', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorRetrySuccess', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorRetrySuccess'), []); + const result = await run.returnValue; expect(result.finalAttempt).toBe(3); @@ -788,11 +855,12 @@ describe('e2e', () => { 'FatalError fails immediately without retries', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorRetryFatal', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorRetryFatal'), []); + const error = await run.returnValue.catch((e: unknown) => e); - expect(result.name).toBe('WorkflowRunFailedError'); - expect(result.cause.message).toContain('Fatal step error'); + expect(WorkflowRunFailedError.is(error)).toBe(true); + assert(WorkflowRunFailedError.is(error)); + expect(error.cause.message).toContain('Fatal step error'); const { json: steps } = await cliInspectJson( `steps --runId ${run.runId}` @@ -809,8 +877,8 @@ describe('e2e', () => { 'RetryableError respects custom retryAfter delay', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorRetryCustomDelay', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorRetryCustomDelay'), []); + const result = await run.returnValue; expect(result.attempt).toBe(2); expect(result.duration).toBeGreaterThan(10_000); @@ -818,8 +886,8 @@ describe('e2e', () => { ); test('maxRetries=0 disables retries', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorRetryDisabled', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorRetryDisabled'), []); + const result = await run.returnValue; expect(result.failed).toBe(true); expect(result.attempt).toBe(1); @@ -831,8 +899,8 @@ describe('e2e', () => { 'FatalError can be caught and detected with FatalError.is()', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('errorFatalCatchable', []); - const result = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('errorFatalCatchable'), []); + const result = await run.returnValue; expect(result.caught).toBe(true); expect(result.isFatal).toBe(true); @@ -884,7 +952,7 @@ describe('e2e', () => { const customData = Math.random().toString(36).slice(2); // Start first workflow - const run1 = await triggerWorkflow('hookCleanupTestWorkflow', [ + const run1 = await start(await e2e('hookCleanupTestWorkflow'), [ token, customData, ]); @@ -908,7 +976,7 @@ describe('e2e', () => { expect(body.runId).toBe(run1.runId); // Get first workflow result - const run1Result = await getWorkflowReturnValue(run1.runId); + const run1Result = await run1.returnValue; expect(run1Result).toMatchObject({ message: 'test-message-1', customData, @@ -916,7 +984,7 @@ describe('e2e', () => { }); // Now verify token can be reused for a second workflow - const run2 = await triggerWorkflow('hookCleanupTestWorkflow', [ + const run2 = await start(await e2e('hookCleanupTestWorkflow'), [ token, customData, ]); @@ -939,7 +1007,7 @@ describe('e2e', () => { expect(body.runId).toBe(run2.runId); // Get second workflow result - const run2Result = await getWorkflowReturnValue(run2.runId); + const run2Result = await run2.returnValue; expect(run2Result).toMatchObject({ message: 'test-message-2', customData, @@ -963,7 +1031,7 @@ describe('e2e', () => { const customData = Math.random().toString(36).slice(2); // Start first workflow - it will create a hook and wait for a payload - const run1 = await triggerWorkflow('hookCleanupTestWorkflow', [ + const run1 = await start(await e2e('hookCleanupTestWorkflow'), [ token, customData, ]); @@ -973,15 +1041,16 @@ describe('e2e', () => { // Start second workflow with the SAME token while first is still running // This should fail because the hook token is already in use - const run2 = await triggerWorkflow('hookCleanupTestWorkflow', [ + const run2 = await start(await e2e('hookCleanupTestWorkflow'), [ token, customData, ]); // The second workflow should fail with a hook token conflict error - const run2Result = await getWorkflowReturnValue(run2.runId); - expect(run2Result.name).toBe('WorkflowRunFailedError'); - expect(run2Result.cause.message).toContain( + const run2Error = await run2.returnValue.catch((e: unknown) => e); + expect(WorkflowRunFailedError.is(run2Error)).toBe(true); + assert(WorkflowRunFailedError.is(run2Error)); + expect(run2Error.cause.message).toContain( 'already in use by another workflow' ); @@ -1002,7 +1071,7 @@ describe('e2e', () => { expect(res.status).toBe(200); // Verify workflow 1 completed successfully - const run1Result = await getWorkflowReturnValue(run1.runId); + const run1Result = await run1.returnValue; expect(run1Result).toMatchObject({ message: 'test-concurrent', customData, @@ -1020,8 +1089,8 @@ describe('e2e', () => { async () => { // This workflow passes a step function reference to another step // The receiving step calls the passed function and returns the result - const run = await triggerWorkflow('stepFunctionPassingWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('stepFunctionPassingWorkflow'), []); + const returnValue = await run.returnValue; // doubleNumber(10) = 20, then multiply by 2 = 40 expect(returnValue).toBe(40); @@ -1053,8 +1122,8 @@ describe('e2e', () => { // This workflow creates a nested step function with closure variables, // then passes it to another step which invokes it. // The closure variables should be serialized and preserved across the call. - const run = await triggerWorkflow('stepFunctionWithClosureWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('stepFunctionWithClosureWorkflow'), []); + const returnValue = await run.returnValue; // Expected: "Wrapped: Result: 21" // - calculate(7) uses closure vars: prefix="Result: ", multiplier=3 @@ -1077,8 +1146,8 @@ describe('e2e', () => { async () => { // This workflow uses a nested step function that references closure variables // from the parent workflow scope (multiplier, prefix, baseValue) - const run = await triggerWorkflow('closureVariableWorkflow', [7]); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('closureVariableWorkflow'), [7]); + const returnValue = await run.returnValue; // Expected: baseValue (7) * multiplier (3) = 21, prefixed with "Result: " expect(returnValue).toBe('Result: 21'); @@ -1092,10 +1161,10 @@ describe('e2e', () => { // This workflow spawns another workflow using start() inside a step function // This is the recommended pattern for spawning workflows from within workflows const inputValue = 42; - const run = await triggerWorkflow('spawnWorkflowFromStepWorkflow', [ + const run = await start(await e2e('spawnWorkflowFromStepWorkflow'), [ inputValue, ]); - const returnValue = await getWorkflowReturnValue(run.runId); + const returnValue = await run.returnValue; // Verify the parent workflow completed expect(returnValue).toHaveProperty('parentInput'); @@ -1251,8 +1320,8 @@ describe('e2e', () => { async () => { // This workflow uses a step that calls a helper function imported via @repo/* path alias // which resolves to a file outside the workbench directory (../../lib/steps/paths-alias-test.ts) - const run = await triggerWorkflow('pathsAliasWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('pathsAliasWorkflow'), []); + const returnValue = await run.returnValue; // The step should return the helper's identifier string expect(returnValue).toBe('pathsAliasHelper'); @@ -1276,14 +1345,9 @@ describe('e2e', () => { // Calculator.calculate(5, 3) should: // 1. MathService.add(5, 3) = 8 // 2. MathService.multiply(8, 2) = 16 - const run = await triggerWorkflow( - { - workflowFile: 'workflows/99_e2e.ts', - workflowFn: 'Calculator.calculate', - }, - [5, 3] - ); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('Calculator.calculate'), [5, 3]); + + const returnValue = await run.returnValue; expect(returnValue).toBe(16); @@ -1304,14 +1368,9 @@ describe('e2e', () => { // 1. AllInOneService.double(10) = 20 // 2. AllInOneService.triple(10) = 30 // 3. return 20 + 30 = 50 - const run = await triggerWorkflow( - { - workflowFile: 'workflows/99_e2e.ts', - workflowFn: 'AllInOneService.processNumber', - }, - [10] - ); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('AllInOneService.processNumber'), [10]); + + const returnValue = await run.returnValue; expect(returnValue).toBe(50); @@ -1332,14 +1391,12 @@ describe('e2e', () => { // - ChainableService.multiplyByClassValue(5) uses `this.multiplier` (10) -> 5 * 10 = 50 // - ChainableService.doubleAndMultiply(5) uses `this.multiplier` (10) -> 5 * 2 * 10 = 100 // - sum = 50 + 100 = 150 - const run = await triggerWorkflow( - { - workflowFile: 'workflows/99_e2e.ts', - workflowFn: 'ChainableService.processWithThis', - }, + const run = await start( + await e2e('ChainableService.processWithThis'), [5] ); - const returnValue = await getWorkflowReturnValue(run.runId); + + const returnValue = await run.returnValue; expect(returnValue).toEqual({ multiplied: 50, // 5 * 10 @@ -1369,8 +1426,8 @@ describe('e2e', () => { // 2. multiplyByFactor.apply({ factor: 3 }, [20]) = 60 // 3. multiplyByFactor.call({ factor: 5 }, 60) = 300 // Total: 10 * 2 * 3 * 5 = 300 - const run = await triggerWorkflow('thisSerializationWorkflow', [10]); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('thisSerializationWorkflow'), [10]); + const returnValue = await run.returnValue; expect(returnValue).toBe(300); @@ -1396,8 +1453,8 @@ describe('e2e', () => { // 2. transformPoint(point, 2) -> Point(6, 8) // 3. transformPoint(scaled, 3) -> Point(18, 24) // 4. sumPoints([Point(1,2), Point(3,4), Point(5,6)]) -> Point(9, 12) - const run = await triggerWorkflow('customSerializationWorkflow', [3, 4]); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('customSerializationWorkflow'), [3, 4]); + const returnValue = await run.returnValue; expect(returnValue).toEqual({ original: { x: 3, y: 4 }, @@ -1434,8 +1491,8 @@ describe('e2e', () => { // 3. counter.multiply(3) -> 5 * 3 = 15 // 4. counter.describe('test counter') -> { label: 'test counter', value: 5 } // 5. Create Counter(100), call counter2.add(50) -> 100 + 50 = 150 - const run = await triggerWorkflow('instanceMethodStepWorkflow', [5]); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('instanceMethodStepWorkflow'), [5]); + const returnValue = await run.returnValue; expect(returnValue).toEqual({ initialValue: 5, @@ -1504,8 +1561,8 @@ describe('e2e', () => { // The critical part is step 2: the workflow code never imports Vector, // so without cross-context registration it wouldn't know how to deserialize it. - const run = await triggerWorkflow('crossContextSerdeWorkflow', []); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await start(await e2e('crossContextSerdeWorkflow'), []); + const returnValue = await run.returnValue; // Verify all the vector operations worked correctly expect(returnValue).toEqual({ @@ -1537,15 +1594,15 @@ describe('e2e', () => { describe.skipIf(!isNextJsApp)('pages router', () => { test('addTenWorkflow via pages router', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow( + const run = await startWorkflowViaHttp( { workflowFile: 'workflows/99_e2e.ts', workflowFn: 'addTenWorkflow', }, [123], - { usePagesRouter: true } + '/api/trigger-pages' ); - const returnValue = await getWorkflowReturnValue(run.runId); + const returnValue = await run.returnValue; expect(returnValue).toBe(133); }); @@ -1553,19 +1610,23 @@ describe('e2e', () => { 'promiseAllWorkflow via pages router', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('promiseAllWorkflow', [], { - usePagesRouter: true, - }); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await startWorkflowViaHttp( + 'promiseAllWorkflow', + [], + '/api/trigger-pages' + ); + const returnValue = await run.returnValue; expect(returnValue).toBe('ABC'); } ); test('sleepingWorkflow via pages router', { timeout: 60_000 }, async () => { - const run = await triggerWorkflow('sleepingWorkflow', [], { - usePagesRouter: true, - }); - const returnValue = await getWorkflowReturnValue(run.runId); + const run = await startWorkflowViaHttp( + 'sleepingWorkflow', + [], + '/api/trigger-pages' + ); + const returnValue = await run.returnValue; expect(returnValue.startTime).toBeLessThan(returnValue.endTime); expect(returnValue.endTime - returnValue.startTime).toBeGreaterThan(9999); }); diff --git a/packages/nitro/src/index.ts b/packages/nitro/src/index.ts index baf8441937..8590c7e5d2 100644 --- a/packages/nitro/src/index.ts +++ b/packages/nitro/src/index.ts @@ -1,3 +1,4 @@ +import { mkdirSync, readFileSync, writeFileSync } from 'node:fs'; import { workflowTransformPlugin } from '@workflow/rollup'; import type { Nitro, NitroModule, RollupConfig } from 'nitro/types'; import { join } from 'pathe'; @@ -65,6 +66,16 @@ export default { nitro.hooks.hook('build:before', async () => { await builder.build(); + + // For prod: write the manifest handler file with inlined content + // now that the builder has generated the manifest. Rollup will + // bundle this file into the compiled output. + if ( + !nitro.options.dev && + process.env.WORKFLOW_PUBLIC_MANIFEST === '1' + ) { + writeManifestHandler(nitro); + } }); // Allows for HMR - but skip the first dev:reload since build:before already ran @@ -98,6 +109,20 @@ export default { // Expose manifest as a public HTTP route when WORKFLOW_PUBLIC_MANIFEST=1 if (process.env.WORKFLOW_PUBLIC_MANIFEST === '1') { + // Write a placeholder manifest-data.mjs so rollup can resolve the + // import. It will be overwritten with the real manifest in build:before. + // Write a placeholder handler file so rollup can resolve the path + // during prod compilation. It will be overwritten with the real + // manifest content by writeManifestHandler() in build:before. + if (!nitro.options.dev) { + const dir = join(nitro.options.buildDir, 'workflow'); + mkdirSync(dir, { recursive: true }); + const handlerPath = join(dir, 'manifest-handler.mjs'); + writeFileSync( + handlerPath, + 'export default async () => new Response("Manifest not found", { status: 404 });\n' + ); + } addManifestHandler(nitro); } } @@ -133,44 +158,96 @@ function addVirtualHandler(nitro: Nitro, route: string, buildPath: string) { } } +const MANIFEST_VIRTUAL_ID = '#workflow/manifest-handler'; + function addManifestHandler(nitro: Nitro) { const route = '/.well-known/workflow/v1/manifest.json'; - const virtualId = '#workflow/manifest-handler'; const manifestPath = join(nitro.options.buildDir, 'workflow/manifest.json'); - - nitro.options.handlers.push({ route, handler: virtualId }); - - if (!nitro.routing) { - // Nitro v2 (legacy) - nitro.options.virtual[virtualId] = /* js */ ` - import { fromWebHandler } from "h3"; - import { readFileSync } from "node:fs"; - function GET() { - try { - const manifest = readFileSync(${JSON.stringify(manifestPath)}, "utf-8"); - return new Response(manifest, { - headers: { "content-type": "application/json" }, - }); - } catch { - return new Response("Manifest not found", { status: 404 }); + const handlerPath = join( + nitro.options.buildDir, + 'workflow/manifest-handler.mjs' + ); + + if (nitro.options.dev) { + // Dev mode: use a virtual handler that reads the manifest from disk at + // request time. The absolute path is valid because we're on the build machine. + nitro.options.handlers.push({ route, handler: MANIFEST_VIRTUAL_ID }); + nitro.options.virtual[MANIFEST_VIRTUAL_ID] = !nitro.routing + ? /* js */ ` + import { fromWebHandler } from "h3"; + import { readFileSync } from "node:fs"; + function GET() { + try { + const manifest = readFileSync(${JSON.stringify(manifestPath)}, "utf-8"); + return new Response(manifest, { + headers: { "content-type": "application/json" }, + }); + } catch { + return new Response("Manifest not found", { status: 404 }); + } } - } - export default fromWebHandler(GET); - `; + export default fromWebHandler(GET); + ` + : /* js */ ` + import { readFileSync } from "node:fs"; + export default async () => { + try { + const manifest = readFileSync(${JSON.stringify(manifestPath)}, "utf-8"); + return new Response(manifest, { + headers: { "content-type": "application/json" }, + }); + } catch { + return new Response("Manifest not found", { status: 404 }); + } + }; + `; } else { - // Nitro v3+ - nitro.options.virtual[virtualId] = /* js */ ` - import { readFileSync } from "node:fs"; - export default async () => { - try { - const manifest = readFileSync(${JSON.stringify(manifestPath)}, "utf-8"); - return new Response(manifest, { - headers: { "content-type": "application/json" }, - }); - } catch { - return new Response("Manifest not found", { status: 404 }); - } - }; - `; + // Prod mode: register a physical handler file that will be written by + // writeManifestHandler() after the builder generates the manifest. + // This file is bundled by rollup into the compiled output. + nitro.options.handlers.push({ route, handler: handlerPath }); + } +} + +/** + * Writes a physical manifest handler file with the manifest content inlined. + * Must be called after the builder generates the manifest (during build:before) + * and before Nitro compiles the bundle with rollup. + */ +function writeManifestHandler(nitro: Nitro) { + const manifestPath = join(nitro.options.buildDir, 'workflow/manifest.json'); + const handlerPath = join( + nitro.options.buildDir, + 'workflow/manifest-handler.mjs' + ); + const dir = join(nitro.options.buildDir, 'workflow'); + mkdirSync(dir, { recursive: true }); + + try { + const manifestContent = readFileSync(manifestPath, 'utf-8'); + JSON.parse(manifestContent); // validate + + const handlerCode = !nitro.routing + ? `import { fromWebHandler } from "h3"; +const manifest = ${JSON.stringify(manifestContent)}; +export default fromWebHandler(() => new Response(manifest, { + headers: { "content-type": "application/json" }, +})); +` + : `const manifest = ${JSON.stringify(manifestContent)}; +export default async () => new Response(manifest, { + headers: { "content-type": "application/json" }, +}); +`; + writeFileSync(handlerPath, handlerCode); + } catch { + // Write a 404 fallback handler + const fallback = !nitro.routing + ? `import { fromWebHandler } from "h3"; +export default fromWebHandler(() => new Response("Manifest not found", { status: 404 })); +` + : `export default async () => new Response("Manifest not found", { status: 404 }); +`; + writeFileSync(handlerPath, fallback); } } diff --git a/scripts/create-test-matrix.mjs b/scripts/create-test-matrix.mjs index ba6dfcb2bb..e2cae982ec 100644 --- a/scripts/create-test-matrix.mjs +++ b/scripts/create-test-matrix.mjs @@ -34,7 +34,7 @@ const DEV_TEST_CONFIGS = { vite: { generatedStepPath: 'node_modules/.nitro/workflow/steps.mjs', generatedWorkflowPath: 'node_modules/.nitro/workflow/workflows.mjs', - apiFilePath: 'routes/api/trigger.post.ts', + apiFilePath: 'routes/api/chat.post.ts', apiFileImportPath: '../..', }, hono: { diff --git a/turbo.json b/turbo.json index 38d6e6089d..56d6764952 100644 --- a/turbo.json +++ b/turbo.json @@ -4,6 +4,7 @@ "tasks": { "build": { "dependsOn": ["^build"], + "env": ["WORKFLOW_PUBLIC_MANIFEST"], "inputs": [ "$TURBO_DEFAULT$", "$TURBO_ROOT$/packages/tsconfig/base.json", diff --git a/workbench/astro/src/pages/api/trigger.ts b/workbench/astro/src/pages/api/trigger.ts deleted file mode 100644 index 38fae99c69..0000000000 --- a/workbench/astro/src/pages/api/trigger.ts +++ /dev/null @@ -1,164 +0,0 @@ -import type { APIRoute } from 'astro'; -import { getRun, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '../../lib/_workflows'; - -export async function POST({ request }: { request: Request }) { - const url = new URL(request.url); - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await request.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -} - -export const GET: APIRoute = async ({ request }: { request: Request }) => { - const url = new URL(request.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}; - -export const prerender = false; diff --git a/workbench/example/api/trigger.ts b/workbench/example/api/trigger.ts deleted file mode 100644 index 8aa1c33fd2..0000000000 --- a/workbench/example/api/trigger.ts +++ /dev/null @@ -1,150 +0,0 @@ -import { getRun, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import workflowManifest from '../manifest.js'; - -export async function POST(req: Request) { - const url = new URL(req.url); - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log( - `Starting "${workflowFile}/${workflowFn}" workflow with args: ${args}` - ); - - try { - const workflowFileItems = - workflowManifest.workflows[ - workflowFile as keyof typeof workflowManifest.workflows - ]; - const run = await start( - workflowFileItems[workflowFn as keyof typeof workflowFileItems], - args - ); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -} - -export async function GET(req: Request) { - const url = new URL(req.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - const headers: HeadersInit = - returnValue instanceof ReadableStream - ? { 'Content-Type': 'application/octet-stream' } - : {}; - - headers['X-Workflow-Run-Created-At'] = createdAt?.toISOString() || ''; - headers['X-Workflow-Run-Started-At'] = startedAt?.toISOString() || ''; - headers['X-Workflow-Run-Completed-At'] = completedAt?.toISOString() || ''; - - return returnValue instanceof ReadableStream - ? new Response(returnValue, { headers }) - : Response.json(returnValue, { headers }); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -} diff --git a/workbench/express/src/index.ts b/workbench/express/src/index.ts index d29fcd53b9..39099a0bc1 100644 --- a/workbench/express/src/index.ts +++ b/workbench/express/src/index.ts @@ -1,12 +1,8 @@ import express from 'express'; -import { getHookByToken, getRun, resumeHook, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { getHookByToken, resumeHook } from 'workflow/api'; import { getWorld, healthCheck } from 'workflow/runtime'; -import { allWorkflows } from '../_workflows.js'; +// Side-effect import to keep _workflows in Nitro's dependency graph for HMR +import '../_workflows.js'; const app = express(); @@ -38,182 +34,6 @@ app.post('/api/hook', async (req, res) => { return res.json(hook); }); -app.post('/api/trigger', async (req, res) => { - const workflowFile = - (req.query.workflowFile as string) || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return res.status(400).send('No workflowFile query parameter provided'); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return res.status(400).send(`Workflow file "${workflowFile}" not found`); - } - - const workflowFn = (req.query.workflowFn as string) || 'simple'; - if (!workflowFn) { - return res.status(400).send('No workflow query parameter provided'); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return res.status(400).send('Workflow not found'); - } - - let args: any[] = []; - - // Args from query string - const argsParam = req.query.args as string; - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const body = req.body; - if (Buffer.isBuffer(body) && body.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(body), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run); - return res.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}); - -app.get('/api/trigger', async (req, res) => { - const runId = req.query.runId as string | undefined; - if (!runId) { - return res.status(400).send('No runId provided'); - } - - const outputStreamParam = req.query['output-stream'] as string | undefined; - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - - // Set headers - res.setHeader('Content-Type', 'application/octet-stream'); - - // Read from the stream and write to Express response - const reader = stream.getReader(); - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - // Add JSON framing to each chunk, wrapping binary data in base64 - const data = - value instanceof Uint8Array - ? { data: Buffer.from(value).toString('base64') } - : value; - res.write(`${JSON.stringify(data)}\n`); - } - res.end(); - } catch (error) { - console.error('Error streaming data:', error); - res.end(); - } - return; - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - res.setHeader('X-Workflow-Run-Created-At', createdAt?.toISOString() || ''); - res.setHeader('X-Workflow-Run-Started-At', startedAt?.toISOString() || ''); - res.setHeader( - 'X-Workflow-Run-Completed-At', - completedAt?.toISOString() || '' - ); - - if (returnValue instanceof ReadableStream) { - // Set headers for streaming response - res.setHeader('Content-Type', 'application/octet-stream'); - - // Read from the stream and write to Express response - const reader = returnValue.getReader(); - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - res.write(value); - } - res.end(); - } catch (streamError) { - console.error('Error streaming return value:', streamError); - res.end(); - } - return; - } - - return res.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return res.status(202).json({ - ...error, - name: error.name, - message: error.message, - }); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return res.status(400).json({ - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return res.status(500).json({ - error: 'Internal server error', - }); - } -}); - app.post('/api/test-health-check', async (req, res) => { // This route tests the queue-based health check functionality try { diff --git a/workbench/fastify/src/index.ts b/workbench/fastify/src/index.ts index e2c15da35b..983b0f3c0a 100644 --- a/workbench/fastify/src/index.ts +++ b/workbench/fastify/src/index.ts @@ -1,14 +1,10 @@ import { readFile } from 'node:fs/promises'; import { resolve } from 'node:path'; import Fastify from 'fastify'; -import { getHookByToken, getRun, resumeHook, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { getHookByToken, resumeHook } from 'workflow/api'; import { getWorld, healthCheck } from 'workflow/runtime'; -import { allWorkflows } from '../_workflows.js'; +// Side-effect import to keep _workflows in Nitro's dependency graph for HMR +import '../_workflows.js'; type JsonResult = { ok: true; value: any } | { ok: false; error: Error }; const parseJson = (text: string): JsonResult => { @@ -76,197 +72,6 @@ server.post('/api/hook', async (req: any, reply) => { return hook; }); -server.post('/api/trigger', async (req: any, reply) => { - const workflowFile = - (req.query.workflowFile as string) || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return reply.code(400).send('No workflowFile query parameter provided'); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return reply.code(400).send(`Workflow file "${workflowFile}" not found`); - } - - const workflowFn = (req.query.workflowFn as string) || 'simple'; - if (!workflowFn) { - return reply.code(400).send('No workflow query parameter provided'); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return reply.code(400).send('Workflow not found'); - } - - let args: any[] = []; - - // Args from query string - const argsParam = req.query.args as string; - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const body = req.body; - if (Buffer.isBuffer(body) && body.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(body), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run); - return run; - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}); - -server.get('/api/trigger', async (req: any, reply) => { - const runId = req.query.runId as string | undefined; - if (!runId) { - return reply.code(400).send('No runId provided'); - } - - const outputStreamParam = req.query['output-stream'] as string | undefined; - - try { - const run = getRun(runId); - - if (outputStreamParam) { - const namespace = - outputStreamParam === '1' ? undefined : outputStreamParam; - const stream = run.getReadable({ namespace }); - const reader = stream.getReader(); - - const toFramedChunk = (value: unknown) => { - if (typeof value === 'string') { - return { data: Buffer.from(value).toString('base64') }; - } - if (value instanceof ArrayBuffer) { - return { data: Buffer.from(value).toString('base64') }; - } - if (ArrayBuffer.isView(value)) { - const view = value as ArrayBufferView; - const buf = Buffer.from( - view.buffer, - view.byteOffset, - view.byteLength - ); - return { data: buf.toString('base64') }; - } - return value; - }; - - reply.type('application/octet-stream'); - // Fastify runs on Node and doesn’t send Web ReadableStreams directly - // read from the Web reader and write framed chunks to the raw response - try { - let chunkCount = 0; - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - chunkCount += 1; - - const framed = toFramedChunk(value); - reply.raw.write(`${JSON.stringify(framed)}\n`); - } - reply.raw.end(); - } catch (error) { - console.error('Error streaming data:', error); - reply.raw.end(); - } finally { - reader.releaseLock(); - } - return; - } - - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - if (returnValue instanceof ReadableStream) { - const reader = returnValue.getReader(); - // reply.type() doesn't apply when we write directly to reply.raw - reply.raw.setHeader('Content-Type', 'application/octet-stream'); - - // Workflow returns a Web ReadableStream; stream it by pulling from - // its reader and writing to reply.raw so Fastify can flush it to the client - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - reply.raw.write(value); - } - reply.raw.end(); - } catch (streamError) { - console.error('Error streaming return value:', streamError); - reply.raw.end(); - } finally { - reader.releaseLock(); - } - return; - } - - // Fastify sends strings as text/plain by default - const payload = - typeof returnValue === 'string' || - typeof returnValue === 'number' || - typeof returnValue === 'boolean' - ? JSON.stringify(returnValue) - : returnValue; - return reply.type('application/json').send(payload); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return reply.code(202).send({ - ...error, - name: error.name, - message: error.message, - }); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return reply.code(400).send({ - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - - return reply.code(500).send({ - error: 'Internal server error', - }); - } -}); - server.post('/api/test-health-check', async (req: any, reply) => { // This route tests the queue-based health check functionality try { diff --git a/workbench/hono/src/index.ts b/workbench/hono/src/index.ts index 02232a07e4..f0c83c91e4 100644 --- a/workbench/hono/src/index.ts +++ b/workbench/hono/src/index.ts @@ -1,182 +1,11 @@ import { Hono } from 'hono'; -import { getHookByToken, getRun, resumeHook, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { getHookByToken, resumeHook } from 'workflow/api'; import { getWorld, healthCheck } from 'workflow/runtime'; -import { allWorkflows } from '../_workflows.js'; +// Side-effect import to keep _workflows in Nitro's dependency graph for HMR +import '../_workflows.js'; const app = new Hono(); -app.post('/api/trigger', async ({ req }) => { - const url = new URL(req.url); - - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}); - -app.get('/api/trigger', async ({ req }) => { - const url = new URL(req.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - const headers: HeadersInit = - returnValue instanceof ReadableStream - ? { 'Content-Type': 'application/octet-stream' } - : {}; - - headers['X-Workflow-Run-Created-At'] = createdAt?.toISOString() || ''; - headers['X-Workflow-Run-Started-At'] = startedAt?.toISOString() || ''; - headers['X-Workflow-Run-Completed-At'] = completedAt?.toISOString() || ''; - - return returnValue instanceof ReadableStream - ? new Response(returnValue, { headers }) - : Response.json(returnValue, { headers }); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}); - app.post('/api/hook', async ({ req }) => { const { token, data } = await req.json(); diff --git a/workbench/nest/src/app.controller.ts b/workbench/nest/src/app.controller.ts index b48ad3012b..200e881168 100644 --- a/workbench/nest/src/app.controller.ts +++ b/workbench/nest/src/app.controller.ts @@ -1,23 +1,15 @@ import { Body, Controller, - Get, HttpCode, HttpException, HttpStatus, Post, - Query, Res, } from '@nestjs/common'; import type { Response } from 'express'; -import { getHookByToken, getRun, resumeHook, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; +import { getHookByToken, resumeHook } from 'workflow/api'; import { getWorld, healthCheck } from 'workflow/runtime'; -import { allWorkflows } from './_workflows.js'; @Controller('api') export class AppController { @@ -49,200 +41,6 @@ export class AppController { return res.status(HttpStatus.OK).json(hook); } - @Post('trigger') - async startWorkflowRun( - @Query('workflowFile') workflowFile: string = 'workflows/99_e2e.ts', - @Query('workflowFn') workflowFn: string = 'simple', - @Query('args') argsParam: string | undefined, - @Body() bodyData: any - ) { - if (!workflowFile) { - throw new HttpException( - 'No workflowFile query parameter provided', - HttpStatus.BAD_REQUEST - ); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - throw new HttpException( - `Workflow file "${workflowFile}" not found`, - HttpStatus.BAD_REQUEST - ); - } - - if (!workflowFn) { - throw new HttpException( - 'No workflow query parameter provided', - HttpStatus.BAD_REQUEST - ); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - throw new HttpException( - `Workflow "${workflowFn}" not found`, - HttpStatus.BAD_REQUEST - ); - } - - let args: any[] = []; - - // Args from query string - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else if (Buffer.isBuffer(bodyData) && bodyData.byteLength > 0) { - // Body is binary serialized data (application/octet-stream) - args = hydrateWorkflowArguments(new Uint8Array(bodyData), globalThis); - } else { - args = [42]; - } - console.log( - `Starting "${workflowFn}" workflow with args: ${JSON.stringify(args)}` - ); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run); - return run; - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } - } - - @Get('trigger') - async getWorkflowRunResult( - @Query('runId') runId: string | undefined, - @Query('output-stream') outputStreamParam: string | undefined, - @Res() res: Response - ) { - if (!runId) { - throw new HttpException('No runId provided', HttpStatus.BAD_REQUEST); - } - - if (outputStreamParam) { - const namespace = - outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - - res.setHeader('Content-Type', 'application/octet-stream'); - const reader = stream.getReader(); - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - // Add JSON framing to each chunk, wrapping binary data in base64 - const data = - value instanceof Uint8Array - ? { data: Buffer.from(value).toString('base64') } - : value; - res.write(`${JSON.stringify(data)}\n`); - } - res.end(); - } catch (error) { - console.error('Error streaming data:', error); - res.end(); - } - return; - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - - res.setHeader( - 'X-Workflow-Run-Created-At', - createdAt?.toISOString() || '' - ); - res.setHeader( - 'X-Workflow-Run-Started-At', - startedAt?.toISOString() || '' - ); - res.setHeader( - 'X-Workflow-Run-Completed-At', - completedAt?.toISOString() || '' - ); - - if (returnValue instanceof ReadableStream) { - res.setHeader('Content-Type', 'application/octet-stream'); - const reader = returnValue.getReader(); - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - res.write(value); - } - res.end(); - } catch (streamError) { - console.error('Error streaming return value:', streamError); - res.end(); - } - return; - } - - return res.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return res.status(HttpStatus.ACCEPTED).json({ - ...error, - name: error.name, - message: error.message, - }); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause as any; - return res.status(HttpStatus.BAD_REQUEST).json({ - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return res.status(HttpStatus.INTERNAL_SERVER_ERROR).json({ - error: 'Internal server error', - }); - } - } - @Post('test-health-check') @HttpCode(HttpStatus.OK) async testHealthCheck(@Body() body: { endpoint?: string; timeout?: number }) { diff --git a/workbench/nextjs-turbopack/app/api/trigger/route.ts b/workbench/nextjs-turbopack/app/api/trigger/route.ts deleted file mode 100644 index ba319d6e32..0000000000 --- a/workbench/nextjs-turbopack/app/api/trigger/route.ts +++ /dev/null @@ -1,173 +0,0 @@ -import { getRun, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '@/_workflows'; - -export async function POST(req: Request) { - const url = new URL(req.url); - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -} - -export async function GET(req: Request) { - const url = new URL(req.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - const headers: HeadersInit = - returnValue instanceof ReadableStream - ? { 'Content-Type': 'application/octet-stream' } - : {}; - - headers['X-Workflow-Run-Created-At'] = createdAt?.toISOString() || ''; - headers['X-Workflow-Run-Started-At'] = startedAt?.toISOString() || ''; - headers['X-Workflow-Run-Completed-At'] = completedAt?.toISOString() || ''; - - return returnValue instanceof ReadableStream - ? new Response(returnValue, { headers }) - : Response.json(returnValue, { headers }); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -} diff --git a/workbench/nextjs-webpack/app/api/trigger/route.ts b/workbench/nextjs-webpack/app/api/trigger/route.ts deleted file mode 100644 index 9125af48ed..0000000000 --- a/workbench/nextjs-webpack/app/api/trigger/route.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { getRun, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '@/_workflows'; - -export async function POST(req: Request) { - const url = new URL(req.url); - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - - console.log('calling workflow', { workflowFile, workflowFn }); - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log( - `Starting "${workflowFile}/${workflowFn}" workflow with args: ${args}` - ); - - try { - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return Response.json( - { error: `Workflow file "${workflowFile}" not found` }, - { status: 404 } - ); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return Response.json( - { error: `Function "${workflowFn}" not found in ${workflowFile}` }, - { status: 400 } - ); - } - - const run = await start(workflow as any, args); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -} - -export async function GET(req: Request) { - const url = new URL(req.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -} diff --git a/workbench/nitro-v2/server/api/trigger.get.ts b/workbench/nitro-v2/server/api/trigger.get.ts deleted file mode 100644 index 647aeb3ca0..0000000000 --- a/workbench/nitro-v2/server/api/trigger.get.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { defineEventHandler, getRequestURL } from 'h3'; -import { getRun } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; - -export default defineEventHandler(async (event) => { - const url = getRequestURL(event); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}); diff --git a/workbench/nitro-v2/server/api/trigger.post.ts b/workbench/nitro-v2/server/api/trigger.post.ts deleted file mode 100644 index 0df2d8346e..0000000000 --- a/workbench/nitro-v2/server/api/trigger.post.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { - defineEventHandler, - getRequestURL, - readRawBody, - toWebRequest, -} from 'h3'; -import { start } from 'workflow/api'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '../../_workflows.js'; - -export default defineEventHandler(async (event) => { - const url = getRequestURL(event); - - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const req = toWebRequest(event); - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}); diff --git a/workbench/nitro-v3/routes/api/trigger.get.ts b/workbench/nitro-v3/routes/api/trigger.get.ts deleted file mode 100644 index 4487105738..0000000000 --- a/workbench/nitro-v3/routes/api/trigger.get.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { getRun } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; - -export default async ({ url }: { req: Request; url: URL }) => { - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - - // Include run metadata in headers - const [createdAt, startedAt, completedAt] = await Promise.all([ - run.createdAt, - run.startedAt, - run.completedAt, - ]); - const headers: HeadersInit = - returnValue instanceof ReadableStream - ? { 'Content-Type': 'application/octet-stream' } - : {}; - - headers['X-Workflow-Run-Created-At'] = createdAt?.toISOString() || ''; - headers['X-Workflow-Run-Started-At'] = startedAt?.toISOString() || ''; - headers['X-Workflow-Run-Completed-At'] = completedAt?.toISOString() || ''; - - return returnValue instanceof ReadableStream - ? new Response(returnValue, { headers }) - : Response.json(returnValue, { headers }); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}; diff --git a/workbench/nitro-v3/routes/api/trigger.post.ts b/workbench/nitro-v3/routes/api/trigger.post.ts deleted file mode 100644 index a156d4d9c6..0000000000 --- a/workbench/nitro-v3/routes/api/trigger.post.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { start } from 'workflow/api'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '../../_workflows.js'; - -export default async ({ req, url }: { req: Request; url: URL }) => { - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}; diff --git a/workbench/nuxt/server/api/trigger.get.ts b/workbench/nuxt/server/api/trigger.get.ts deleted file mode 100644 index 647aeb3ca0..0000000000 --- a/workbench/nuxt/server/api/trigger.get.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { defineEventHandler, getRequestURL } from 'h3'; -import { getRun } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; - -export default defineEventHandler(async (event) => { - const url = getRequestURL(event); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}); diff --git a/workbench/nuxt/server/api/trigger.post.ts b/workbench/nuxt/server/api/trigger.post.ts deleted file mode 100644 index 0df2d8346e..0000000000 --- a/workbench/nuxt/server/api/trigger.post.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { - defineEventHandler, - getRequestURL, - readRawBody, - toWebRequest, -} from 'h3'; -import { start } from 'workflow/api'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '../../_workflows.js'; - -export default defineEventHandler(async (event) => { - const url = getRequestURL(event); - - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const req = toWebRequest(event); - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}); diff --git a/workbench/sveltekit/src/routes/api/trigger/+server.ts b/workbench/sveltekit/src/routes/api/trigger/+server.ts deleted file mode 100644 index d3f0086b24..0000000000 --- a/workbench/sveltekit/src/routes/api/trigger/+server.ts +++ /dev/null @@ -1,162 +0,0 @@ -import type { RequestHandler } from '@sveltejs/kit'; -import { getRun, start } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '$lib/_workflows.js'; - -export const POST: RequestHandler = async ({ request }) => { - const url = new URL(request.url); - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await request.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -}; - -export const GET: RequestHandler = async ({ request }) => { - const url = new URL(request.url); - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}; diff --git a/workbench/vite/routes/api/trigger.get.ts b/workbench/vite/routes/api/trigger.get.ts deleted file mode 100644 index a7ef468e6e..0000000000 --- a/workbench/vite/routes/api/trigger.get.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { getRun } from 'workflow/api'; -import { - WorkflowRunFailedError, - WorkflowRunNotCompletedError, -} from 'workflow/internal/errors'; - -export default async ({ url }: { req: Request; url: URL }) => { - const runId = url.searchParams.get('runId'); - if (!runId) { - return new Response('No runId provided', { status: 400 }); - } - - const outputStreamParam = url.searchParams.get('output-stream'); - if (outputStreamParam) { - const namespace = outputStreamParam === '1' ? undefined : outputStreamParam; - const run = getRun(runId); - const stream = run.getReadable({ - namespace, - }); - // Add JSON framing to the stream, wrapping binary data in base64 - const streamWithFraming = new TransformStream({ - transform(chunk, controller) { - const data = - chunk instanceof Uint8Array - ? { data: Buffer.from(chunk).toString('base64') } - : chunk; - controller.enqueue(`${JSON.stringify(data)}\n`); - }, - }); - return new Response(stream.pipeThrough(streamWithFraming), { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }); - } - - try { - const run = getRun(runId); - const returnValue = await run.returnValue; - console.log('Return value:', returnValue); - return returnValue instanceof ReadableStream - ? new Response(returnValue, { - headers: { - 'Content-Type': 'application/octet-stream', - }, - }) - : Response.json(returnValue); - } catch (error) { - if (error instanceof Error) { - if (WorkflowRunNotCompletedError.is(error)) { - return Response.json( - { - ...error, - name: error.name, - message: error.message, - }, - { status: 202 } - ); - } - - if (WorkflowRunFailedError.is(error)) { - const cause = error.cause; - return Response.json( - { - ...error, - name: error.name, - message: error.message, - cause: { - message: cause.message, - stack: cause.stack, - code: cause.code, - }, - }, - { status: 400 } - ); - } - } - - console.error( - 'Unexpected error while getting workflow return value:', - error - ); - return Response.json( - { - error: 'Internal server error', - }, - { status: 500 } - ); - } -}; diff --git a/workbench/vite/routes/api/trigger.post.ts b/workbench/vite/routes/api/trigger.post.ts deleted file mode 100644 index a156d4d9c6..0000000000 --- a/workbench/vite/routes/api/trigger.post.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { start } from 'workflow/api'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; -import { allWorkflows } from '../../_workflows.js'; - -export default async ({ req, url }: { req: Request; url: URL }) => { - const workflowFile = - url.searchParams.get('workflowFile') || 'workflows/99_e2e.ts'; - if (!workflowFile) { - return new Response('No workflowFile query parameter provided', { - status: 400, - }); - } - const workflows = allWorkflows[workflowFile as keyof typeof allWorkflows]; - if (!workflows) { - return new Response(`Workflow file "${workflowFile}" not found`, { - status: 400, - }); - } - - const workflowFn = url.searchParams.get('workflowFn') || 'simple'; - if (!workflowFn) { - return new Response('No workflow query parameter provided', { - status: 400, - }); - } - - // Handle static method lookups (e.g., "Calculator.calculate") - let workflow: unknown; - if (workflowFn.includes('.')) { - const [className, methodName] = workflowFn.split('.'); - const cls = workflows[className as keyof typeof workflows]; - if (cls && typeof cls === 'function') { - workflow = (cls as Record)[methodName]; - } - } else { - workflow = workflows[workflowFn as keyof typeof workflows]; - } - if (!workflow) { - return new Response(`Workflow "${workflowFn}" not found`, { status: 400 }); - } - - let args: any[] = []; - - // Args from query string - const argsParam = url.searchParams.get('args'); - if (argsParam) { - args = argsParam.split(',').map((arg) => { - const num = parseFloat(arg); - return Number.isNaN(num) ? arg.trim() : num; - }); - } else { - // Args from body (binary serialized data) - const buffer = await req.arrayBuffer(); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } - } - console.log(`Starting "${workflowFn}" workflow with args: ${args}`); - - try { - const run = await start(workflow as any, args as any); - console.log('Run:', run.runId); - return Response.json(run); - } catch (err) { - console.error(`Failed to start!!`, err); - throw err; - } -};