Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/health-policy-config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@iqai/alert-logger": minor
---

Add configurable HealthPolicy for adapter health/retry behavior

- New `health` config option with `unhealthyThreshold`, `healthWindowMs`, `drainIntervalMs`, `maxRetries`, and `entryExpiryMs`
- Extract shared `formatDuration` utility to eliminate duplication between health-manager and discord formatter
- Fix drain-only recovery: `onRecovery` now fires when adapters become unhealthy purely through background drain retries
- Immediate re-drain after discarding expired queue entries for faster stale queue cleanup
11 changes: 1 addition & 10 deletions src/adapters/discord/formatter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { FormattedAlert } from '../../core/types.js'
import { formatDuration } from '../../core/utils.js'

export interface DiscordEmbed {
title: string
Expand Down Expand Up @@ -32,16 +33,6 @@ function sanitize(text: string): string {
.replace(/<#\d+>/g, '[channel]')
}

function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000)
if (seconds < 60) return `${seconds}s`
const minutes = Math.floor(seconds / 60)
if (minutes < 60) return `${minutes}m`
const hours = Math.floor(minutes / 60)
const remainingMinutes = minutes % 60
return remainingMinutes > 0 ? `${hours}h ${remainingMinutes}m` : `${hours}h`
}

export function formatDiscordEmbed(alert: FormattedAlert): DiscordEmbed {
const { aggregation } = alert
const phase = aggregation.phase
Expand Down
4 changes: 3 additions & 1 deletion src/core/alert-logger.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Aggregator, type ResolvedEntry } from './aggregator.js'
import { fingerprint } from './fingerprinter.js'
import { formatDuration, HealthManager } from './health-manager.js'
import { HealthManager } from './health-manager.js'
import { Router } from './router.js'
import type {
AlertAdapter,
Expand All @@ -11,6 +11,7 @@ import type {
ResolvedConfig,
} from './types.js'
import { resolveConfig } from './types.js'
import { formatDuration } from './utils.js'

interface AlertMeta {
level: AlertLevel
Expand All @@ -37,6 +38,7 @@ export class AlertLogger {
this.healthManager = new HealthManager({
maxQueueSize: config.queue.maxSize,
persistPath: config.queue.persistPath,
policy: config.health,
onRecovery: (adapterName, queuedCount, downtimeMs) => {
this.handleAdapterRecovery(adapterName, queuedCount, downtimeMs)
},
Expand Down
197 changes: 188 additions & 9 deletions src/core/health-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { formatDuration, HealthManager } from './health-manager.js'
import { HealthManager } from './health-manager.js'
import type { AlertAdapter, AlertLevel, FormattedAlert } from './types.js'
import { DEFAULT_HEALTH } from './types.js'
import { formatDuration } from './utils.js'

function createMockAdapter(options?: {
failCount?: number
Expand Down Expand Up @@ -58,7 +60,7 @@ describe('HealthManager', () => {

it('healthy adapter: dispatch calls adapter.send directly', async () => {
const adapter = createMockAdapter()
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })
const alert = createAlert()

hm.dispatch(adapter, alert)
Expand All @@ -75,7 +77,7 @@ describe('HealthManager', () => {

it('FIFO: new alerts enqueue when queue is non-empty even if healthy', async () => {
const adapter = createMockAdapter({ failCount: 1 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })

// First dispatch: healthy + empty queue -> sends directly, fails
hm.dispatch(adapter, createAlert({ title: 'first' }))
Expand All @@ -92,7 +94,7 @@ describe('HealthManager', () => {

it('failed send: increments consecutiveFailures and enqueues', async () => {
const adapter = createMockAdapter({ failCount: 1 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })
const alert = createAlert()

hm.dispatch(adapter, alert)
Expand All @@ -108,7 +110,7 @@ describe('HealthManager', () => {

it('unhealthy after 3 failures + 30s: enqueues without calling send', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })

// First dispatch sends directly (queue empty + healthy), fails and enqueues
hm.dispatch(adapter, createAlert({ title: 'alert-0' }))
Expand Down Expand Up @@ -146,7 +148,7 @@ describe('HealthManager', () => {
it('warning emitted once: console.warn called once, not on subsequent failures', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})
const adapter = createMockAdapter({ failCount: 100 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })

// First dispatch sends directly (queue empty), fails
hm.dispatch(adapter, createAlert())
Expand Down Expand Up @@ -189,7 +191,12 @@ describe('HealthManager', () => {
const onRecovery = vi.fn()
// Adapter fails first 3 calls, succeeds after (call 4+)
const adapter = createMockAdapter({ failCount: 3 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, onRecovery })
const hm = new HealthManager({
maxQueueSize: 100,
persistPath: null,
policy: DEFAULT_HEALTH,
onRecovery,
})

// Suppress console.warn
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})
Expand Down Expand Up @@ -224,7 +231,7 @@ describe('HealthManager', () => {

it('expired entries (>1hr) are discarded during drain', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

// First dispatch sends directly (queue empty), fails and enqueues
Expand Down Expand Up @@ -256,9 +263,181 @@ describe('HealthManager', () => {
await hm.destroy()
})

it('custom health policy: unhealthy after custom threshold', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const customPolicy = {
unhealthyThreshold: 5,
healthWindowMs: 10_000,
drainIntervalMs: 5_000,
maxRetries: 10,
entryExpiryMs: 3_600_000,
}
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy })

// First dispatch sends directly, fails (failure #1)
hm.dispatch(adapter, createAlert())
await vi.advanceTimersByTimeAsync(0)

// Drain retries at 5s intervals (custom drainIntervalMs)
await vi.advanceTimersByTimeAsync(5_000) // failure #2
await vi.advanceTimersByTimeAsync(5_000) // failure #3

// With default threshold=3, adapter would be unhealthy after healthWindowMs.
// But our threshold is 5, so still healthy after 3 failures.
expect(hm.isHealthy(adapter)).toBe(true)

await vi.advanceTimersByTimeAsync(5_000) // failure #4
await vi.advanceTimersByTimeAsync(5_000) // failure #5
// Now 5 failures AND >10s since lastSuccessAt — unhealthy
expect(hm.isHealthy(adapter)).toBe(false)

await hm.destroy()
})

it('custom health policy: entries expire at custom entryExpiryMs', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const customPolicy = {
unhealthyThreshold: 3,
healthWindowMs: 30_000,
drainIntervalMs: 10_000,
maxRetries: 3,
entryExpiryMs: 60_000, // 1 minute instead of 1 hour
}
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy })
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

hm.dispatch(adapter, createAlert())
await vi.advanceTimersByTimeAsync(0)
expect(hm.queueSize(adapter)).toBe(1)

// Build up failures for drain retries
await vi.advanceTimersByTimeAsync(10_000)
await vi.advanceTimersByTimeAsync(10_000)
expect(hm.queueSize(adapter)).toBe(1) // still in queue, not expired yet

// Advance past custom expiry (60s) — entries should be discarded on next drain
vi.advanceTimersByTime(61_000)
await vi.advanceTimersByTimeAsync(10_000) // drain discards expired entry
expect(hm.queueSize(adapter)).toBe(0) // entry expired and removed

warnSpy.mockRestore()
await hm.destroy()
})

it('custom health policy: maxRetries respected for retry count', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const customPolicy = {
unhealthyThreshold: 3,
healthWindowMs: 30_000,
drainIntervalMs: 5_000,
maxRetries: 1, // discard after 1 retry
entryExpiryMs: 3_600_000,
}
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: customPolicy })
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

hm.dispatch(adapter, createAlert())
await vi.advanceTimersByTimeAsync(0) // fails, enqueues with retryCount=0
expect(hm.queueSize(adapter)).toBe(1)

// First drain retry fails → retryCount=1 → exceeds maxRetries=1 → dequeued
await vi.advanceTimersByTimeAsync(5_000)
expect(hm.queueSize(adapter)).toBe(0) // discarded after 1 retry

warnSpy.mockRestore()
await hm.destroy()
})

it('full recovery flow: unhealthy → queued → drain succeeds → recovery callback → healthy', async () => {
const onRecovery = vi.fn()
// Fails first 4 calls, succeeds on call 5+
const adapter = createMockAdapter({ failCount: 4 })
const hm = new HealthManager({
maxQueueSize: 100,
persistPath: null,
policy: DEFAULT_HEALTH,
onRecovery,
})
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

// Send and fail (call 1)
hm.dispatch(adapter, createAlert({ title: 'alert-1' }))
await vi.advanceTimersByTimeAsync(0)

// Queue more while queue is non-empty
hm.dispatch(adapter, createAlert({ title: 'alert-2' }))
hm.dispatch(adapter, createAlert({ title: 'alert-3' }))

// Drain fails (call 2, 3)
await vi.advanceTimersByTimeAsync(10_000)
await vi.advanceTimersByTimeAsync(10_000)

// Make unhealthy
vi.advanceTimersByTime(31_000)
hm.dispatch(adapter, createAlert({ title: 'alert-while-unhealthy' }))
await vi.advanceTimersByTimeAsync(0)
expect(hm.isHealthy(adapter)).toBe(false)

// Drain fails once more (call 4), then succeeds (call 5)
await vi.advanceTimersByTimeAsync(10_000) // call 4 fails
await vi.advanceTimersByTimeAsync(10_000) // call 5 succeeds → recovery

expect(onRecovery).toHaveBeenCalledTimes(1)
expect(hm.isHealthy(adapter)).toBe(true)

// New dispatches should go directly again (queue eventually drains)
// Allow re-drain to process remaining items
await vi.advanceTimersByTimeAsync(200)

expect(adapter.sent.length).toBeGreaterThan(0)

warnSpy.mockRestore()
await hm.destroy()
})

it('drain-only unhealthy: onRecovery fires even without new dispatches', async () => {
const onRecovery = vi.fn()
// Fails first 4 calls, succeeds on call 5+
const adapter = createMockAdapter({ failCount: 4 })
const hm = new HealthManager({
maxQueueSize: 100,
persistPath: null,
policy: DEFAULT_HEALTH,
onRecovery,
})
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

// Single dispatch, then NO more dispatches — only drain retries
hm.dispatch(adapter, createAlert())
await vi.advanceTimersByTimeAsync(0) // call 1 fails

// Queue more so drain has work
hm.dispatch(adapter, createAlert())
hm.dispatch(adapter, createAlert())

// Drain retries build failures (calls 2, 3)
await vi.advanceTimersByTimeAsync(10_000)
await vi.advanceTimersByTimeAsync(10_000)

// Advance past healthWindowMs — adapter becomes unhealthy during drain
vi.advanceTimersByTime(31_000)

// Drain fails (call 4) — warnedAt should be set here (not via dispatch)
await vi.advanceTimersByTimeAsync(10_000)

// Drain succeeds (call 5) — recovery should fire
await vi.advanceTimersByTimeAsync(10_000)

expect(onRecovery).toHaveBeenCalledTimes(1)
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('unhealthy'))

warnSpy.mockRestore()
await hm.destroy()
})

it('destroy clears timers', async () => {
const adapter = createMockAdapter({ failCount: 100 })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null })
const hm = new HealthManager({ maxQueueSize: 100, persistPath: null, policy: DEFAULT_HEALTH })
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

// Dispatch a failing alert to start drain timer
Expand Down
Loading
Loading