Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/little-cycles-stay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Improve logging: consolidate to structured logger, fix log levels, ensure errors/warnings are always visible
12 changes: 1 addition & 11 deletions packages/core/src/events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,13 @@ describe('EventsConsumer', () => {
.fn()
.mockReturnValue(EventConsumerResult.Finished);

// Mock console.error to avoid noise in test output
const consoleErrorSpy = vi
.spyOn(console, 'error')
.mockImplementation(() => {});

consumer.subscribe(throwingCallback);
consumer.subscribe(normalCallback);
await waitForNextTick();

// Error is caught and logged via eventsLogger, processing continues to next callback
expect(throwingCallback).toHaveBeenCalledWith(event);
expect(normalCallback).toHaveBeenCalledWith(event);
expect(consoleErrorSpy).toHaveBeenCalledWith(
'EventConsumer callback threw an error',
expect.any(Error)
);

consoleErrorSpy.mockRestore();
});

it('should handle callback removal during iteration', async () => {
Expand Down
9 changes: 0 additions & 9 deletions packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ export class EventsConsumer {
constructor(events: Event[]) {
this.events = events;
this.eventIndex = 0;

eventsLogger.debug('EventsConsumer initialized', { events });
}

/**
Expand All @@ -53,14 +51,7 @@ export class EventsConsumer {
handled = callback(currentEvent);
} catch (error) {
eventsLogger.error('EventConsumer callback threw an error', { error });
// Hopefully shouldn't happen, but we don't want to block the workflow
console.error('EventConsumer callback threw an error', error);
}
eventsLogger.debug('EventConsumer callback result', {
handled: EventConsumerResult[handled],
eventIndex: this.eventIndex,
eventId: currentEvent?.eventId,
});
if (
handled === EventConsumerResult.Consumed ||
handled === EventConsumerResult.Finished
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ function createLogger(namespace: string) {
const levelDebug = baseDebug.extend(level);

return (message: string, metadata?: Record<string, any>) => {
// Always output error/warn to console so users see critical issues
// debug/info only output when DEBUG env var is set
if (level === 'error') {
console.error(`[Workflow] ${message}`, metadata ?? '');
} else if (level === 'warn') {
console.warn(`[Workflow] ${message}`, metadata ?? '');
}

// Also log to debug library for verbose output when DEBUG is enabled
levelDebug(message, metadata);

if (levelDebug.enabled) {
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { inspect } from 'node:util';
import { parseClassName } from '@workflow/utils/parse-name';
import { unflatten } from 'devalue';
import { runtimeLogger } from './logger.js';
import {
getCommonRevivers,
hydrateStepArguments,
Expand Down Expand Up @@ -360,7 +361,7 @@ const hydrateEventData = <
}
}
} catch (error) {
console.error('Error hydrating event data', error);
runtimeLogger.error('Error hydrating event data', { error });
}
return {
...event,
Expand Down
16 changes: 11 additions & 5 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ export function workflowEntrypoint(

if (workflowRun.status !== 'running') {
// Workflow has already completed or failed, so we can skip it
console.warn(
`Workflow "${runId}" has status "${workflowRun.status}", skipping`
runtimeLogger.info(
'Workflow already completed or failed, skipping',
{
workflowRunId: runId,
status: workflowRun.status,
}
);

// TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event
Expand Down Expand Up @@ -253,9 +257,11 @@ export function workflowEntrypoint(
);
}

console.error(
`${errorName} while running "${runId}" workflow:\n\n${errorStack}`
);
runtimeLogger.error('Error while running workflow', {
workflowRunId: runId,
errorName,
errorStack,
});
// Fail the workflow run via event (event-sourced architecture)
await world.events.create(runId, {
eventType: 'run_failed',
Expand Down
69 changes: 50 additions & 19 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { pluralize } from '@workflow/utils';
import { getPort } from '@workflow/utils/get-port';
import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world';
import { runtimeLogger } from '../logger.js';
import { runtimeLogger, stepLogger } from '../logger.js';
import { getStepFunction } from '../private.js';
import {
dehydrateStepReturnValue,
Expand Down Expand Up @@ -143,7 +143,11 @@ const stepHandler = getWorldHandlers().createQueueHandler(
if (step.attempt > maxRetries + 1) {
const retryCount = step.attempt - 1;
const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`;
console.error(`[Workflows] "${workflowRunId}" - ${errorMessage}`);
stepLogger.error('Step exceeded max retries', {
workflowRunId,
stepName,
retryCount,
});
// Fail the step via event (event-sourced architecture)
await world.events.create(workflowRunId, {
eventType: 'step_failed',
Expand Down Expand Up @@ -175,9 +179,12 @@ const stepHandler = getWorldHandlers().createQueueHandler(
// a) pending - initial state, or state set on re-try
// b) running - if a step fails mid-execution, like a function timeout
// otherwise, the step has been invoked erroneously
console.error(
`[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution`
);
stepLogger.warn('Step invoked erroneously, skipping execution', {
workflowRunId,
stepName,
expectedStatus: ['pending', 'running'],
actualStatus: step.status,
});
span?.setAttributes({
...Attribute.StepSkipped(true),
...Attribute.StepSkipReason(step.status),
Expand Down Expand Up @@ -307,18 +314,27 @@ const stepHandler = getWorldHandlers().createQueueHandler(
if (WorkflowAPIError.is(err)) {
if (err.status === 410) {
// Workflow has already completed, so no-op
console.warn(
`Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}`
stepLogger.info(
'Workflow run already completed, skipping step',
{
workflowRunId,
stepId,
message: err.message,
}
);
return;
}
}

if (FatalError.is(err)) {
const errorStack = getErrorStack(err);
const stackLines = errorStack.split('\n').slice(0, 4);
console.error(
`[Workflows] "${workflowRunId}" - Encountered \`FatalError\` while executing step "${stepName}":\n > ${stackLines.join('\n > ')}\n\nBubbling up error to parent workflow`
stepLogger.error(
'Encountered FatalError while executing step, bubbling up to parent workflow',
{
workflowRunId,
stepName,
errorStack,
}
);
// Fail the step via event (event-sourced architecture)
await world.events.create(workflowRunId, {
Expand Down Expand Up @@ -349,10 +365,16 @@ const stepHandler = getWorldHandlers().createQueueHandler(
if (currentAttempt >= maxRetries + 1) {
// Max retries reached
const errorStack = getErrorStack(err);
const stackLines = errorStack.split('\n').slice(0, 4);
const retryCount = step.attempt - 1;
console.error(
`[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${step.attempt}, ${retryCount} ${pluralize('retry', 'retries', retryCount)}):\n > ${stackLines.join('\n > ')}\n\n Max retries reached\n Bubbling error to parent workflow`
stepLogger.error(
'Max retries reached, bubbling error to parent workflow',
{
workflowRunId,
stepName,
attempt: step.attempt,
retryCount,
errorStack,
}
);
const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`;
// Fail the step via event (event-sourced architecture)
Expand All @@ -373,14 +395,23 @@ const stepHandler = getWorldHandlers().createQueueHandler(
} else {
// Not at max retries yet - log as a retryable error
if (RetryableError.is(err)) {
console.warn(
`[Workflows] "${workflowRunId}" - Encountered \`RetryableError\` while executing step "${stepName}" (attempt ${currentAttempt}):\n > ${String(err.message)}\n\n This step has failed but will be retried`
stepLogger.warn(
'Encountered RetryableError, step will be retried',
{
workflowRunId,
stepName,
attempt: currentAttempt,
message: err.message,
}
);
} else {
const stackLines = getErrorStack(err).split('\n').slice(0, 4);
console.error(
`[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${currentAttempt}):\n > ${stackLines.join('\n > ')}\n\n This step has failed but will be retried`
);
const errorStack = getErrorStack(err);
stepLogger.warn('Encountered Error, step will be retried', {
workflowRunId,
stepName,
attempt: currentAttempt,
errorStack,
});
}
// Set step to pending for retry via event (event-sourced architecture)
// step_retrying records the error and sets status to pending
Expand Down
21 changes: 17 additions & 4 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type {
WaitInvocationQueueItem,
WorkflowSuspension,
} from '../global.js';
import { runtimeLogger } from '../logger.js';
import { dehydrateStepArguments } from '../serialization.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier } from '../telemetry.js';
Expand Down Expand Up @@ -99,8 +100,12 @@ export async function handleSuspension({
} catch (err) {
if (WorkflowAPIError.is(err)) {
if (err.status === 410) {
console.warn(
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
runtimeLogger.info(
'Workflow run already completed, skipping hook',
{
workflowRunId: runId,
message: err.message,
}
);
} else {
throw err;
Expand Down Expand Up @@ -152,7 +157,11 @@ export async function handleSuspension({
await world.events.create(runId, stepEvent);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(`Step already exists, continuing: ${err.message}`);
runtimeLogger.info('Step already exists, continuing', {
workflowRunId: runId,
correlationId: queueItem.correlationId,
message: err.message,
});
} else {
throw err;
}
Expand Down Expand Up @@ -196,7 +205,11 @@ export async function handleSuspension({
await world.events.create(runId, waitEvent);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(`Wait already exists, continuing: ${err.message}`);
runtimeLogger.info('Wait already exists, continuing', {
workflowRunId: runId,
correlationId: queueItem.correlationId,
message: err.message,
});
} else {
throw err;
}
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
pollReadableLock,
pollWritableLock,
} from './flushable-stream.js';
import { runtimeLogger } from './logger.js';
import { getStepFunction } from './private.js';
import { getWorld } from './runtime/world.js';
import { contextStorage } from './step/context-storage.js';
Expand Down Expand Up @@ -157,12 +158,12 @@ function formatSerializationError(context: string, error: unknown): string {
}
message += `. Ensure you're ${verb} serializable types (plain objects, arrays, primitives, Date, RegExp, Map, Set).`;

// Log the problematic value to console for debugging
// Log the problematic value for debugging
if (error instanceof DevalueError && error.value !== undefined) {
console.error(
`[Workflows] Serialization failed for ${context}. Problematic value:`
);
console.error(error.value);
runtimeLogger.error('Serialization failed', {
context,
problematicValue: error.value,
});
}

return message;
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type * as api from '@opentelemetry/api';
import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api';
import { once } from '@workflow/utils';
import { WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
import * as Attr from './telemetry/semantic-conventions.js';

// ============================================================
Expand Down Expand Up @@ -64,7 +65,7 @@ const OtelApi = once(async () => {
try {
return await import('@opentelemetry/api');
} catch {
console.warn('OpenTelemetry not available, tracing will be disabled');
runtimeLogger.info('OpenTelemetry not available, tracing will be disabled');
return null;
}
});
Expand Down
Loading