diff --git a/.changeset/little-cycles-stay.md b/.changeset/little-cycles-stay.md new file mode 100644 index 0000000000..2ad9de9468 --- /dev/null +++ b/.changeset/little-cycles-stay.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Improve logging: consolidate to structured logger, fix log levels, ensure errors/warnings are always visible diff --git a/packages/core/src/events-consumer.test.ts b/packages/core/src/events-consumer.test.ts index dfb73e2ae0..1250b3e18a 100644 --- a/packages/core/src/events-consumer.test.ts +++ b/packages/core/src/events-consumer.test.ts @@ -262,23 +262,13 @@ describe('EventsConsumer', () => { .fn() .mockReturnValue(EventConsumerResult.Finished); - // Mock console.error to avoid noise in test output - const consoleErrorSpy = vi - .spyOn(console, 'error') - .mockImplementation(() => {}); - consumer.subscribe(throwingCallback); consumer.subscribe(normalCallback); await waitForNextTick(); + // Error is caught and logged via eventsLogger, processing continues to next callback expect(throwingCallback).toHaveBeenCalledWith(event); expect(normalCallback).toHaveBeenCalledWith(event); - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'EventConsumer callback threw an error', - expect.any(Error) - ); - - consoleErrorSpy.mockRestore(); }); it('should handle callback removal during iteration', async () => { diff --git a/packages/core/src/events-consumer.ts b/packages/core/src/events-consumer.ts index 221d111fa3..b1dc986b2f 100644 --- a/packages/core/src/events-consumer.ts +++ b/packages/core/src/events-consumer.ts @@ -26,8 +26,6 @@ export class EventsConsumer { constructor(events: Event[]) { this.events = events; this.eventIndex = 0; - - eventsLogger.debug('EventsConsumer initialized', { events }); } /** @@ -53,14 +51,7 @@ export class EventsConsumer { handled = callback(currentEvent); } catch (error) { eventsLogger.error('EventConsumer callback threw an error', { error }); - // Hopefully shouldn't happen, but we don't want to block the workflow - console.error('EventConsumer callback threw an error', error); } - eventsLogger.debug('EventConsumer callback result', { - handled: EventConsumerResult[handled], - eventIndex: this.eventIndex, - eventId: currentEvent?.eventId, - }); if ( handled === EventConsumerResult.Consumed || handled === EventConsumerResult.Finished diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 365ebb6a3d..d0a0ae1d5b 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -8,6 +8,15 @@ function createLogger(namespace: string) { const levelDebug = baseDebug.extend(level); return (message: string, metadata?: Record) => { + // Always output error/warn to console so users see critical issues + // debug/info only output when DEBUG env var is set + if (level === 'error') { + console.error(`[Workflow] ${message}`, metadata ?? ''); + } else if (level === 'warn') { + console.warn(`[Workflow] ${message}`, metadata ?? ''); + } + + // Also log to debug library for verbose output when DEBUG is enabled levelDebug(message, metadata); if (levelDebug.enabled) { diff --git a/packages/core/src/observability.ts b/packages/core/src/observability.ts index e0b3319c33..430610e6ea 100644 --- a/packages/core/src/observability.ts +++ b/packages/core/src/observability.ts @@ -6,6 +6,7 @@ import { inspect } from 'node:util'; import { parseClassName } from '@workflow/utils/parse-name'; import { unflatten } from 'devalue'; +import { runtimeLogger } from './logger.js'; import { getCommonRevivers, hydrateStepArguments, @@ -360,7 +361,7 @@ const hydrateEventData = < } } } catch (error) { - console.error('Error hydrating event data', error); + runtimeLogger.error('Error hydrating event data', { error }); } return { ...event, diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 5a4f218ab7..a9d262b929 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -141,8 +141,12 @@ export function workflowEntrypoint( if (workflowRun.status !== 'running') { // Workflow has already completed or failed, so we can skip it - console.warn( - `Workflow "${runId}" has status "${workflowRun.status}", skipping` + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } ); // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event @@ -253,9 +257,11 @@ export function workflowEntrypoint( ); } - console.error( - `${errorName} while running "${runId}" workflow:\n\n${errorStack}` - ); + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); // Fail the workflow run via event (event-sourced architecture) await world.events.create(runId, { eventType: 'run_failed', diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 1864d0e0b4..e8ba72fec9 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -8,7 +8,7 @@ import { import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; -import { runtimeLogger } from '../logger.js'; +import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; import { dehydrateStepReturnValue, @@ -143,7 +143,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( if (step.attempt > maxRetries + 1) { const retryCount = step.attempt - 1; const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; - console.error(`[Workflows] "${workflowRunId}" - ${errorMessage}`); + stepLogger.error('Step exceeded max retries', { + workflowRunId, + stepName, + retryCount, + }); // Fail the step via event (event-sourced architecture) await world.events.create(workflowRunId, { eventType: 'step_failed', @@ -175,9 +179,12 @@ const stepHandler = getWorldHandlers().createQueueHandler( // a) pending - initial state, or state set on re-try // b) running - if a step fails mid-execution, like a function timeout // otherwise, the step has been invoked erroneously - console.error( - `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` - ); + stepLogger.warn('Step invoked erroneously, skipping execution', { + workflowRunId, + stepName, + expectedStatus: ['pending', 'running'], + actualStatus: step.status, + }); span?.setAttributes({ ...Attribute.StepSkipped(true), ...Attribute.StepSkipReason(step.status), @@ -307,8 +314,13 @@ const stepHandler = getWorldHandlers().createQueueHandler( if (WorkflowAPIError.is(err)) { if (err.status === 410) { // Workflow has already completed, so no-op - console.warn( - `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` + stepLogger.info( + 'Workflow run already completed, skipping step', + { + workflowRunId, + stepId, + message: err.message, + } ); return; } @@ -316,9 +328,13 @@ const stepHandler = getWorldHandlers().createQueueHandler( if (FatalError.is(err)) { const errorStack = getErrorStack(err); - const stackLines = errorStack.split('\n').slice(0, 4); - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`FatalError\` while executing step "${stepName}":\n > ${stackLines.join('\n > ')}\n\nBubbling up error to parent workflow` + stepLogger.error( + 'Encountered FatalError while executing step, bubbling up to parent workflow', + { + workflowRunId, + stepName, + errorStack, + } ); // Fail the step via event (event-sourced architecture) await world.events.create(workflowRunId, { @@ -349,10 +365,16 @@ const stepHandler = getWorldHandlers().createQueueHandler( if (currentAttempt >= maxRetries + 1) { // Max retries reached const errorStack = getErrorStack(err); - const stackLines = errorStack.split('\n').slice(0, 4); const retryCount = step.attempt - 1; - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${step.attempt}, ${retryCount} ${pluralize('retry', 'retries', retryCount)}):\n > ${stackLines.join('\n > ')}\n\n Max retries reached\n Bubbling error to parent workflow` + stepLogger.error( + 'Max retries reached, bubbling error to parent workflow', + { + workflowRunId, + stepName, + attempt: step.attempt, + retryCount, + errorStack, + } ); const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${String(err)}`; // Fail the step via event (event-sourced architecture) @@ -373,14 +395,23 @@ const stepHandler = getWorldHandlers().createQueueHandler( } else { // Not at max retries yet - log as a retryable error if (RetryableError.is(err)) { - console.warn( - `[Workflows] "${workflowRunId}" - Encountered \`RetryableError\` while executing step "${stepName}" (attempt ${currentAttempt}):\n > ${String(err.message)}\n\n This step has failed but will be retried` + stepLogger.warn( + 'Encountered RetryableError, step will be retried', + { + workflowRunId, + stepName, + attempt: currentAttempt, + message: err.message, + } ); } else { - const stackLines = getErrorStack(err).split('\n').slice(0, 4); - console.error( - `[Workflows] "${workflowRunId}" - Encountered \`Error\` while executing step "${stepName}" (attempt ${currentAttempt}):\n > ${stackLines.join('\n > ')}\n\n This step has failed but will be retried` - ); + const errorStack = getErrorStack(err); + stepLogger.warn('Encountered Error, step will be retried', { + workflowRunId, + stepName, + attempt: currentAttempt, + errorStack, + }); } // Set step to pending for retry via event (event-sourced architecture) // step_retrying records the error and sets status to pending diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 5033d5ee8e..c1d11827a5 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -13,6 +13,7 @@ import type { WaitInvocationQueueItem, WorkflowSuspension, } from '../global.js'; +import { runtimeLogger } from '../logger.js'; import { dehydrateStepArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier } from '../telemetry.js'; @@ -99,8 +100,12 @@ export async function handleSuspension({ } catch (err) { if (WorkflowAPIError.is(err)) { if (err.status === 410) { - console.warn( - `Workflow run "${runId}" has already completed, skipping hook: ${err.message}` + runtimeLogger.info( + 'Workflow run already completed, skipping hook', + { + workflowRunId: runId, + message: err.message, + } ); } else { throw err; @@ -152,7 +157,11 @@ export async function handleSuspension({ await world.events.create(runId, stepEvent); } catch (err) { if (WorkflowAPIError.is(err) && err.status === 409) { - console.warn(`Step already exists, continuing: ${err.message}`); + runtimeLogger.info('Step already exists, continuing', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); } else { throw err; } @@ -196,7 +205,11 @@ export async function handleSuspension({ await world.events.create(runId, waitEvent); } catch (err) { if (WorkflowAPIError.is(err) && err.status === 409) { - console.warn(`Wait already exists, continuing: ${err.message}`); + runtimeLogger.info('Wait already exists, continuing', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); } else { throw err; } diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 855bc8fac9..172275e902 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -9,6 +9,7 @@ import { pollReadableLock, pollWritableLock, } from './flushable-stream.js'; +import { runtimeLogger } from './logger.js'; import { getStepFunction } from './private.js'; import { getWorld } from './runtime/world.js'; import { contextStorage } from './step/context-storage.js'; @@ -157,12 +158,12 @@ function formatSerializationError(context: string, error: unknown): string { } message += `. Ensure you're ${verb} serializable types (plain objects, arrays, primitives, Date, RegExp, Map, Set).`; - // Log the problematic value to console for debugging + // Log the problematic value for debugging if (error instanceof DevalueError && error.value !== undefined) { - console.error( - `[Workflows] Serialization failed for ${context}. Problematic value:` - ); - console.error(error.value); + runtimeLogger.error('Serialization failed', { + context, + problematicValue: error.value, + }); } return message; diff --git a/packages/core/src/telemetry.ts b/packages/core/src/telemetry.ts index aef6dfeb60..dff12c3b61 100644 --- a/packages/core/src/telemetry.ts +++ b/packages/core/src/telemetry.ts @@ -2,6 +2,7 @@ import type * as api from '@opentelemetry/api'; import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; import { once } from '@workflow/utils'; import { WorkflowSuspension } from './global.js'; +import { runtimeLogger } from './logger.js'; import * as Attr from './telemetry/semantic-conventions.js'; // ============================================================ @@ -64,7 +65,7 @@ const OtelApi = once(async () => { try { return await import('@opentelemetry/api'); } catch { - console.warn('OpenTelemetry not available, tracing will be disabled'); + runtimeLogger.info('OpenTelemetry not available, tracing will be disabled'); return null; } });