diff --git a/.changeset/health-policy-config.md b/.changeset/health-policy-config.md new file mode 100644 index 0000000..f129c38 --- /dev/null +++ b/.changeset/health-policy-config.md @@ -0,0 +1,10 @@ +--- +"@iqai/alert-logger": minor +--- + +Add configurable HealthPolicy for adapter health/retry behavior + +- New `health` config option with `unhealthyThreshold`, `healthWindowMs`, `drainIntervalMs`, `maxRetries`, and `entryExpiryMs` +- Extract shared `formatDuration` utility to eliminate duplication between health-manager and discord formatter +- Fix drain-only recovery: `onRecovery` now fires when adapters become unhealthy purely through background drain retries +- Immediate re-drain after discarding expired queue entries for faster stale queue cleanup diff --git a/src/adapters/discord/formatter.ts b/src/adapters/discord/formatter.ts index 0e25a1c..56efa1f 100644 --- a/src/adapters/discord/formatter.ts +++ b/src/adapters/discord/formatter.ts @@ -1,4 +1,5 @@ import type { FormattedAlert } from '../../core/types.js' +import { formatDuration } from '../../core/utils.js' export interface DiscordEmbed { title: string @@ -32,16 +33,6 @@ function sanitize(text: string): string { .replace(/<#\d+>/g, '[channel]') } -function formatDuration(ms: number): string { - const seconds = Math.floor(ms / 1000) - if (seconds < 60) return `${seconds}s` - const minutes = Math.floor(seconds / 60) - if (minutes < 60) return `${minutes}m` - const hours = Math.floor(minutes / 60) - const remainingMinutes = minutes % 60 - return remainingMinutes > 0 ? `${hours}h ${remainingMinutes}m` : `${hours}h` -} - export function formatDiscordEmbed(alert: FormattedAlert): DiscordEmbed { const { aggregation } = alert const phase = aggregation.phase diff --git a/src/core/alert-logger.ts b/src/core/alert-logger.ts index deed1a7..966813b 100644 --- a/src/core/alert-logger.ts +++ b/src/core/alert-logger.ts @@ -1,6 +1,6 @@ import { Aggregator, type ResolvedEntry } from './aggregator.js' import { fingerprint } from './fingerprinter.js' -import { formatDuration, HealthManager } from './health-manager.js' +import { HealthManager } from './health-manager.js' import { Router } from './router.js' import type { AlertAdapter, @@ -11,6 +11,7 @@ import type { ResolvedConfig, } from './types.js' import { resolveConfig } from './types.js' +import { formatDuration } from './utils.js' interface AlertMeta { level: AlertLevel @@ -37,6 +38,7 @@ export class AlertLogger { this.healthManager = new HealthManager({ maxQueueSize: config.queue.maxSize, persistPath: config.queue.persistPath, + policy: config.health, onRecovery: (adapterName, queuedCount, downtimeMs) => { this.handleAdapterRecovery(adapterName, queuedCount, downtimeMs) }, diff --git a/src/core/health-manager.test.ts b/src/core/health-manager.test.ts index 88d005f..9fd82e6 100644 --- a/src/core/health-manager.test.ts +++ b/src/core/health-manager.test.ts @@ -1,6 +1,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { formatDuration, HealthManager } from './health-manager.js' +import { HealthManager } from './health-manager.js' import type { AlertAdapter, AlertLevel, FormattedAlert } from './types.js' +import { DEFAULT_HEALTH } from './types.js' +import { formatDuration } from './utils.js' function createMockAdapter(options?: { failCount?: number @@ -58,7 +60,7 @@ describe('HealthManager', () => { it('healthy adapter: dispatch calls adapter.send directly', async () => { const adapter = createMockAdapter() - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) const alert = createAlert() hm.dispatch(adapter, alert) @@ -75,7 +77,7 @@ describe('HealthManager', () => { it('FIFO: new alerts enqueue when queue is non-empty even if healthy', async () => { const adapter = createMockAdapter({ failCount: 1 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) // First dispatch: healthy + empty queue -> sends directly, fails hm.dispatch(adapter, createAlert({ title: 'first' })) @@ -92,7 +94,7 @@ describe('HealthManager', () => { it('failed send: increments consecutiveFailures and enqueues', async () => { const adapter = createMockAdapter({ failCount: 1 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) const alert = createAlert() hm.dispatch(adapter, alert) @@ -108,7 +110,7 @@ describe('HealthManager', () => { it('unhealthy after 3 failures + 30s: enqueues without calling send', async () => { const adapter = createMockAdapter({ failCount: 100 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) // First dispatch sends directly (queue empty + healthy), fails and enqueues hm.dispatch(adapter, createAlert({ title: 'alert-0' })) @@ -146,7 +148,7 @@ describe('HealthManager', () => { it('warning emitted once: console.warn called once, not on subsequent failures', async () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) const adapter = createMockAdapter({ failCount: 100 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) // First dispatch sends directly (queue empty), fails hm.dispatch(adapter, createAlert()) @@ -189,7 +191,12 @@ describe('HealthManager', () => { const onRecovery = vi.fn() // Adapter fails first 3 calls, succeeds after (call 4+) const adapter = createMockAdapter({ failCount: 3 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, onRecovery }) + const hm = new HealthManager({ + maxQueueSize: 100, + persistPath: null, + policy: DEFAULT_HEALTH, + onRecovery, + }) // Suppress console.warn const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) @@ -224,7 +231,7 @@ describe('HealthManager', () => { it('expired entries (>1hr) are discarded during drain', async () => { const adapter = createMockAdapter({ failCount: 100 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) // First dispatch sends directly (queue empty), fails and enqueues @@ -256,9 +263,181 @@ describe('HealthManager', () => { await hm.destroy() }) + it('custom health policy: unhealthy after custom threshold', async () => { + const adapter = createMockAdapter({ failCount: 100 }) + const customPolicy = { + unhealthyThreshold: 5, + healthWindowMs: 10_000, + drainIntervalMs: 5_000, + maxRetries: 10, + entryExpiryMs: 3_600_000, + } + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy }) + + // First dispatch sends directly, fails (failure #1) + hm.dispatch(adapter, createAlert()) + await vi.advanceTimersByTimeAsync(0) + + // Drain retries at 5s intervals (custom drainIntervalMs) + await vi.advanceTimersByTimeAsync(5_000) // failure #2 + await vi.advanceTimersByTimeAsync(5_000) // failure #3 + + // With default threshold=3, adapter would be unhealthy after healthWindowMs. + // But our threshold is 5, so still healthy after 3 failures. + expect(hm.isHealthy(adapter)).toBe(true) + + await vi.advanceTimersByTimeAsync(5_000) // failure #4 + await vi.advanceTimersByTimeAsync(5_000) // failure #5 + // Now 5 failures AND >10s since lastSuccessAt — unhealthy + expect(hm.isHealthy(adapter)).toBe(false) + + await hm.destroy() + }) + + it('custom health policy: entries expire at custom entryExpiryMs', async () => { + const adapter = createMockAdapter({ failCount: 100 }) + const customPolicy = { + unhealthyThreshold: 3, + healthWindowMs: 30_000, + drainIntervalMs: 10_000, + maxRetries: 3, + entryExpiryMs: 60_000, // 1 minute instead of 1 hour + } + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy }) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + hm.dispatch(adapter, createAlert()) + await vi.advanceTimersByTimeAsync(0) + expect(hm.queueSize(adapter)).toBe(1) + + // Build up failures for drain retries + await vi.advanceTimersByTimeAsync(10_000) + await vi.advanceTimersByTimeAsync(10_000) + expect(hm.queueSize(adapter)).toBe(1) // still in queue, not expired yet + + // Advance past custom expiry (60s) — entries should be discarded on next drain + vi.advanceTimersByTime(61_000) + await vi.advanceTimersByTimeAsync(10_000) // drain discards expired entry + expect(hm.queueSize(adapter)).toBe(0) // entry expired and removed + + warnSpy.mockRestore() + await hm.destroy() + }) + + it('custom health policy: maxRetries respected for retry count', async () => { + const adapter = createMockAdapter({ failCount: 100 }) + const customPolicy = { + unhealthyThreshold: 3, + healthWindowMs: 30_000, + drainIntervalMs: 5_000, + maxRetries: 1, // discard after 1 retry + entryExpiryMs: 3_600_000, + } + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy }) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + hm.dispatch(adapter, createAlert()) + await vi.advanceTimersByTimeAsync(0) // fails, enqueues with retryCount=0 + expect(hm.queueSize(adapter)).toBe(1) + + // First drain retry fails → retryCount=1 → exceeds maxRetries=1 → dequeued + await vi.advanceTimersByTimeAsync(5_000) + expect(hm.queueSize(adapter)).toBe(0) // discarded after 1 retry + + warnSpy.mockRestore() + await hm.destroy() + }) + + it('full recovery flow: unhealthy → queued → drain succeeds → recovery callback → healthy', async () => { + const onRecovery = vi.fn() + // Fails first 4 calls, succeeds on call 5+ + const adapter = createMockAdapter({ failCount: 4 }) + const hm = new HealthManager({ + maxQueueSize: 100, + persistPath: null, + policy: DEFAULT_HEALTH, + onRecovery, + }) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + // Send and fail (call 1) + hm.dispatch(adapter, createAlert({ title: 'alert-1' })) + await vi.advanceTimersByTimeAsync(0) + + // Queue more while queue is non-empty + hm.dispatch(adapter, createAlert({ title: 'alert-2' })) + hm.dispatch(adapter, createAlert({ title: 'alert-3' })) + + // Drain fails (call 2, 3) + await vi.advanceTimersByTimeAsync(10_000) + await vi.advanceTimersByTimeAsync(10_000) + + // Make unhealthy + vi.advanceTimersByTime(31_000) + hm.dispatch(adapter, createAlert({ title: 'alert-while-unhealthy' })) + await vi.advanceTimersByTimeAsync(0) + expect(hm.isHealthy(adapter)).toBe(false) + + // Drain fails once more (call 4), then succeeds (call 5) + await vi.advanceTimersByTimeAsync(10_000) // call 4 fails + await vi.advanceTimersByTimeAsync(10_000) // call 5 succeeds → recovery + + expect(onRecovery).toHaveBeenCalledTimes(1) + expect(hm.isHealthy(adapter)).toBe(true) + + // New dispatches should go directly again (queue eventually drains) + // Allow re-drain to process remaining items + await vi.advanceTimersByTimeAsync(200) + + expect(adapter.sent.length).toBeGreaterThan(0) + + warnSpy.mockRestore() + await hm.destroy() + }) + + it('drain-only unhealthy: onRecovery fires even without new dispatches', async () => { + const onRecovery = vi.fn() + // Fails first 4 calls, succeeds on call 5+ + const adapter = createMockAdapter({ failCount: 4 }) + const hm = new HealthManager({ + maxQueueSize: 100, + persistPath: null, + policy: DEFAULT_HEALTH, + onRecovery, + }) + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + // Single dispatch, then NO more dispatches — only drain retries + hm.dispatch(adapter, createAlert()) + await vi.advanceTimersByTimeAsync(0) // call 1 fails + + // Queue more so drain has work + hm.dispatch(adapter, createAlert()) + hm.dispatch(adapter, createAlert()) + + // Drain retries build failures (calls 2, 3) + await vi.advanceTimersByTimeAsync(10_000) + await vi.advanceTimersByTimeAsync(10_000) + + // Advance past healthWindowMs — adapter becomes unhealthy during drain + vi.advanceTimersByTime(31_000) + + // Drain fails (call 4) — warnedAt should be set here (not via dispatch) + await vi.advanceTimersByTimeAsync(10_000) + + // Drain succeeds (call 5) — recovery should fire + await vi.advanceTimersByTimeAsync(10_000) + + expect(onRecovery).toHaveBeenCalledTimes(1) + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('unhealthy')) + + warnSpy.mockRestore() + await hm.destroy() + }) + it('destroy clears timers', async () => { const adapter = createMockAdapter({ failCount: 100 }) - const hm = new HealthManager({ maxQueueSize: 100, persistPath: null }) + const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH }) const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) // Dispatch a failing alert to start drain timer diff --git a/src/core/health-manager.ts b/src/core/health-manager.ts index c3f01c7..69c3684 100644 --- a/src/core/health-manager.ts +++ b/src/core/health-manager.ts @@ -1,7 +1,7 @@ import { loadQueuesFromDisk, saveQueuesToDisk } from './queue-persistence.js' import type { QueueEntry } from './retry-queue.js' import { RetryQueue } from './retry-queue.js' -import type { AlertAdapter, FormattedAlert } from './types.js' +import type { AlertAdapter, FormattedAlert, HealthPolicy } from './types.js' interface AdapterHealth { consecutiveFailures: number @@ -14,21 +14,10 @@ interface AdapterHealth { interface HealthManagerConfig { maxQueueSize: number persistPath: string | null + policy: HealthPolicy onRecovery?: (adapterName: string, queuedCount: number, downtimeMs: number) => void } -function formatDuration(ms: number): string { - const seconds = Math.floor(ms / 1000) - if (seconds < 60) return `${seconds}s` - const minutes = Math.floor(seconds / 60) - if (minutes < 60) return `${minutes}m` - const hours = Math.floor(minutes / 60) - const remainingMinutes = minutes % 60 - return remainingMinutes > 0 ? `${hours}h ${remainingMinutes}m` : `${hours}h` -} - -export { formatDuration } - export class HealthManager { private readonly config: HealthManagerConfig private readonly adapters: Map = new Map() @@ -53,10 +42,19 @@ export class HealthManager { return health } + queueSize(adapter: AlertAdapter): number { + const health = this.adapters.get(adapter.name) + return health ? health.queue.size : 0 + } + isHealthy(adapter: AlertAdapter): boolean { const health = this.adapters.get(adapter.name) if (!health) return true - return !(health.consecutiveFailures >= 3 && Date.now() - health.lastSuccessAt > 30_000) + const { unhealthyThreshold, healthWindowMs } = this.config.policy + return !( + health.consecutiveFailures >= unhealthyThreshold && + Date.now() - health.lastSuccessAt > healthWindowMs + ) } dispatch(adapter: AlertAdapter, alert: FormattedAlert): void { @@ -105,7 +103,7 @@ export class HealthManager { const timer = setInterval(() => { void this.drainOnce(adapter) - }, 10_000) + }, this.config.policy.drainIntervalMs) timer.unref?.() this.drainTimers.set(adapter.name, timer) } @@ -128,9 +126,12 @@ export class HealthManager { return } - // Discard expired entries (>1 hour old) - if (Date.now() - entry.enqueuedAt > 3_600_000) { + // Discard expired entries, re-drain immediately if more remain + if (Date.now() - entry.enqueuedAt > this.config.policy.entryExpiryMs) { health.queue.dequeue() + if (!health.queue.isEmpty) { + setTimeout(() => void this.drainOnce(adapter), 0) + } return } @@ -162,7 +163,16 @@ export class HealthManager { // Send failed — track consecutive failures for health status health.consecutiveFailures++ entry.retryCount++ - if (entry.retryCount >= 3) { + + // Track unhealthy transition during drain (not just dispatch) + if (!this.isHealthy(adapter) && health.warnedAt === null) { + console.warn( + `[alert-logger] ${adapter.name} adapter is unhealthy (${health.consecutiveFailures} consecutive failures)`, + ) + health.warnedAt = Date.now() + } + + if (entry.retryCount >= this.config.policy.maxRetries) { health.queue.dequeue() } } diff --git a/src/core/types.ts b/src/core/types.ts index 1ef5c12..a3cc979 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -62,6 +62,19 @@ export interface RoutingConfig { pings?: Partial> } +export interface HealthPolicy { + /** Number of consecutive failures before marking adapter unhealthy (default: 3) */ + unhealthyThreshold: number + /** Time since last success before health check fails, in ms (default: 30000) */ + healthWindowMs: number + /** Interval between queue drain attempts, in ms (default: 10000) */ + drainIntervalMs: number + /** Max retries before discarding a queued entry (default: 3) */ + maxRetries: number + /** Time after which queued entries are discarded, in ms (default: 3600000) */ + entryExpiryMs: number +} + export interface QueueConfig { maxSize: number persistPath: string | null @@ -86,6 +99,7 @@ export interface AlertLoggerConfig { routing?: RoutingConfig environments?: Record queue?: Partial + health?: Partial fingerprint?: Partial } @@ -96,6 +110,7 @@ export interface ResolvedConfig { aggregation: AggregationConfig routing: RoutingConfig queue: QueueConfig + health: HealthPolicy fingerprint: FingerprintConfig levels: AlertLevel[] pings: Partial> @@ -108,6 +123,14 @@ export const DEFAULT_AGGREGATION: AggregationConfig = { resolutionCooldownMs: 2 * 60_000, } +export const DEFAULT_HEALTH: HealthPolicy = { + unhealthyThreshold: 3, + healthWindowMs: 30_000, + drainIntervalMs: 10_000, + maxRetries: 3, + entryExpiryMs: 3_600_000, +} + export const DEFAULT_QUEUE: QueueConfig = { maxSize: 500, persistPath: null, @@ -148,6 +171,7 @@ export function resolveConfig(config: AlertLoggerConfig): ResolvedConfig { aggregation, routing: config.routing ?? {}, queue: { ...DEFAULT_QUEUE, ...config.queue }, + health: { ...DEFAULT_HEALTH, ...config.health }, fingerprint: { ...DEFAULT_FINGERPRINT, ...config.fingerprint }, levels, pings, diff --git a/src/core/utils.ts b/src/core/utils.ts new file mode 100644 index 0000000..d71b967 --- /dev/null +++ b/src/core/utils.ts @@ -0,0 +1,13 @@ +/** + * Format a duration in milliseconds to a human-readable string. + * Examples: "5s", "3m", "1h 30m" + */ +export function formatDuration(ms: number): string { + const seconds = Math.floor(ms / 1000) + if (seconds < 60) return `${seconds}s` + const minutes = Math.floor(seconds / 60) + if (minutes < 60) return `${minutes}m` + const hours = Math.floor(minutes / 60) + const remainingMinutes = minutes % 60 + return remainingMinutes > 0 ? `${hours}h ${remainingMinutes}m` : `${hours}h` +} diff --git a/src/index.ts b/src/index.ts index fffa062..250ceaa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,6 +19,7 @@ export type { EnvironmentConfig, FingerprintConfig, FormattedAlert, + HealthPolicy, NormalizerRule, QueueConfig, ResolvedConfig,