Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,43 @@ describe('e2e', () => {
}
);

test(
'health check (queue-based) - workflow and step endpoints respond to health check messages',
{ timeout: 60_000 },
async () => {
// NOTE: This tests the queue-based health check using healthCheck() function.
// This approach bypasses Vercel Deployment Protection by sending messages
// through the Queue infrastructure rather than direct HTTP.
const url = new URL('/api/test-health-check', deploymentUrl);

// Test workflow endpoint health check
const workflowRes = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...getProtectionBypassHeaders(),
},
body: JSON.stringify({ endpoint: 'workflow', timeout: 30000 }),
});
expect(workflowRes.status).toBe(200);
const workflowResult = await workflowRes.json();
expect(workflowResult.healthy).toBe(true);

// Test step endpoint health check
const stepRes = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...getProtectionBypassHeaders(),
},
body: JSON.stringify({ endpoint: 'step', timeout: 30000 }),
});
expect(stepRes.status).toBe(200);
const stepResult = await stepRes.json();
expect(stepResult.healthy).toBe(true);
}
);

test(
'pathsAliasWorkflow - TypeScript path aliases resolve correctly',
{ timeout: 60_000 },
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ export function workflowEntrypoint(
'__wkf_workflow_',
async (message_, metadata) => {
// Check if this is a health check message
// NOTE: Health check messages are intentionally unauthenticated for monitoring purposes.
// They only write a simple status response to a stream and do not expose sensitive data.
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'workflow');
Expand Down
85 changes: 52 additions & 33 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import type { Event, ValidQueueName, World } from '@workflow/world';
import type {
Event,
HealthCheckPayload,
ValidQueueName,
World,
} from '@workflow/world';
import {
HEALTH_CHECK_STEP_QUEUE,
HEALTH_CHECK_STREAM_PREFIX,
HEALTH_CHECK_WORKFLOW_QUEUE,
HealthCheckPayloadSchema,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { z } from 'zod';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { getSpanKind, trace } from '../telemetry.js';
import { getWorld } from './world.js';
Expand All @@ -10,24 +20,6 @@ const DEFAULT_HEALTH_CHECK_TIMEOUT = 30_000;

const generateId = monotonicFactory();

/**
* Stream name prefix for health check responses.
* The full stream name is `__health_check__${correlationId}`.
*/
const HEALTH_CHECK_STREAM_PREFIX = '__health_check__';

/**
* Health check payload - used to verify that the queue pipeline
* can deliver messages to workflow/step endpoints.
* This bypasses Deployment Protection on Vercel.
*/
const HealthCheckPayloadSchema = z.object({
__healthCheck: z.literal(true),
correlationId: z.string(),
});

type HealthCheckPayload = z.infer<typeof HealthCheckPayloadSchema>;

/**
* Result of a health check operation.
*/
Expand Down Expand Up @@ -105,15 +97,12 @@ export async function healthCheck(
const streamName = `${HEALTH_CHECK_STREAM_PREFIX}${correlationId}`;

// Determine which queue to use based on endpoint
const queueName: ValidQueueName = `__wkf_${endpoint}___health_check__`;
const queueName: ValidQueueName =
endpoint === 'workflow'
? HEALTH_CHECK_WORKFLOW_QUEUE
: HEALTH_CHECK_STEP_QUEUE;

try {
// Send the health check message through the queue
await world.queue(queueName, {
__healthCheck: true,
correlationId,
});

// Wait for the response with timeout
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
Expand All @@ -135,17 +124,47 @@ export async function healthCheck(
}

// Parse the response
const responseText = new TextDecoder().decode(
Buffer.concat(chunks.map((c) => Buffer.from(c)))
);
const response = JSON.parse(responseText);
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
const combined = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
combined.set(chunk, offset);
offset += chunk.length;
}
const responseText = new TextDecoder().decode(combined);

let response: unknown;
try {
response = JSON.parse(responseText);
} catch {
throw new Error('Invalid health check response format');
}

// Type guard: ensure response has the expected structure
if (
typeof response !== 'object' ||
response === null ||
!('healthy' in response) ||
typeof (response as { healthy: unknown }).healthy !== 'boolean'
) {
throw new Error('Invalid health check response structure');
}

return {
healthy: response.healthy === true,
healthy: (response as { healthy: boolean }).healthy,
};
};

return await Promise.race([readStreamResponse(), timeoutPromise]);
// Start reading from stream BEFORE sending the queue message to avoid race condition
const responsePromise = readStreamResponse();

// Send the health check message through the queue
await world.queue(queueName, {
__healthCheck: true,
correlationId,
});

return await Promise.race([responsePromise, timeoutPromise]);
} catch (error) {
return {
healthy: false,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const stepHandler = getWorldHandlers().createQueueHandler(
'__wkf_step_',
async (message_, metadata) => {
// Check if this is a health check message
// NOTE: Health check messages are intentionally unauthenticated for monitoring purposes.
// They only write a simple status response to a stream and do not expose sensitive data.
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'step');
Expand Down
4 changes: 4 additions & 0 deletions packages/world/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export { HookSchema } from './hooks.js';
export type * from './interfaces.js';
export type * from './queue.js';
export {
HEALTH_CHECK_STEP_QUEUE,
HEALTH_CHECK_STREAM_PREFIX,
HEALTH_CHECK_WORKFLOW_QUEUE,
HealthCheckPayloadSchema,
MessageId,
QueuePayloadSchema,
QueuePrefix,
Expand Down
25 changes: 22 additions & 3 deletions packages/world/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,35 @@ export const StepInvokePayloadSchema = z.object({

export type WorkflowInvokePayload = z.infer<typeof WorkflowInvokePayloadSchema>;
export type StepInvokePayload = z.infer<typeof StepInvokePayloadSchema>;
export type HealthCheckPayload = z.infer<typeof HealthCheckPayloadSchema>;
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type HealthCheckPayload is being defined using z.infer<typeof HealthCheckPayloadSchema> before HealthCheckPayloadSchema is declared. This creates a forward reference issue. Move line 41 to after line 51 (after the schema definition) to fix the ordering.

Copilot uses AI. Check for mistakes.

/**
* Health check payload - internal type used by core to verify queue pipeline.
* Not exported as part of the public API.
* Health check payload - used to verify that the queue pipeline
* can deliver messages to workflow/step endpoints.
* This bypasses Deployment Protection on Vercel.
*/
const HealthCheckPayloadSchema = z.object({
export const HealthCheckPayloadSchema = z.object({
__healthCheck: z.literal(true),
correlationId: z.string(),
});

/**
* Stream name prefix for health check responses.
* The full stream name is `__health_check__${correlationId}`.
* Used by both the core handlers and world implementations.
*/
export const HEALTH_CHECK_STREAM_PREFIX = '__health_check__';

/**
* Queue name for workflow health checks.
*/
export const HEALTH_CHECK_WORKFLOW_QUEUE = '__wkf_workflow_health_check';

/**
* Queue name for step health checks.
*/
export const HEALTH_CHECK_STEP_QUEUE = '__wkf_step_health_check';

export const QueuePayloadSchema = z.union([
WorkflowInvokePayloadSchema,
StepInvokePayloadSchema,
Expand Down
30 changes: 30 additions & 0 deletions workbench/example/api/test-health-check.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This route tests the queue-based health check functionality

import { getWorld, healthCheck } from 'workflow';

export async function POST(req: Request) {
try {
const body = await req.json();
const { endpoint = 'workflow', timeout = 30000 } = body;

console.log(
`Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms`
);

const world = getWorld();
const result = await healthCheck(world, endpoint, { timeout });

console.log(`Health check result:`, result);

return Response.json(result);
} catch (error) {
console.error('Health check test failed:', error);
return Response.json(
{
healthy: false,
error: error instanceof Error ? error.message : String(error),
},
{ status: 500 }
);
}
}
30 changes: 30 additions & 0 deletions workbench/nextjs-turbopack/app/api/test-health-check/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This route tests the queue-based health check functionality

import { getWorld, healthCheck } from 'workflow/api';

export async function POST(req: Request) {
try {
const body = await req.json();
const { endpoint = 'workflow', timeout = 30000 } = body;

console.log(
`Testing queue-based health check for endpoint: ${endpoint}, timeout: ${timeout}ms`
);

const world = getWorld();
const result = await healthCheck(world, endpoint, { timeout });

console.log(`Health check result:`, result);

return Response.json(result);
} catch (error) {
console.error('Health check test failed:', error);
return Response.json(
{
healthy: false,
error: error instanceof Error ? error.message : String(error),
},
{ status: 500 }
);
}
}
Loading