From c1904bca78c8bc5175de8f207584b32dde9a97ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:20:11 +0100 Subject: [PATCH 01/10] feat(o11y): track ingest version in session metrics Add ingestVersion field to session metrics to track which version of the ingestion system processed each session. The version is persisted in the SessionIngestDO metadata and included in all metrics emissions. - Add ingestVersion to SessionMetricsParams schema - Store ingestVersion in ingest_meta table during session initialization - Pass ingestVersion through to metrics emission - Map ingestVersion to double11 in Analytics Engine - Update tests to include ingestVersion field --- cloudflare-o11y/src/session-metrics-analytics.ts | 2 ++ cloudflare-o11y/src/session-metrics-schema.ts | 2 ++ cloudflare-o11y/test/index.spec.ts | 3 ++- .../src/dos/SessionIngestDO.ts | 16 +++++++++++----- cloudflare-session-ingest/src/o11y-binding.d.ts | 1 + 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/cloudflare-o11y/src/session-metrics-analytics.ts b/cloudflare-o11y/src/session-metrics-analytics.ts index 65f65ec143..3ee8f839a1 100644 --- a/cloudflare-o11y/src/session-metrics-analytics.ts +++ b/cloudflare-o11y/src/session-metrics-analytics.ts @@ -18,6 +18,7 @@ import type { SessionMetricsParams } from './session-metrics-schema'; * double8 = compactionCount * double9 = stuckToolCallCount * double10 = autoCompactionCount + * double11 = ingestVersion */ export function writeSessionMetricsDataPoint(params: SessionMetricsParams, env: Env): void { const totalTokensSum = @@ -41,6 +42,7 @@ export function writeSessionMetricsDataPoint(params: SessionMetricsParams, env: params.compactionCount, params.stuckToolCallCount, params.autoCompactionCount, + params.ingestVersion, ], }); } diff --git a/cloudflare-o11y/src/session-metrics-schema.ts b/cloudflare-o11y/src/session-metrics-schema.ts index d5ae9db9ea..70ca05f328 100644 --- a/cloudflare-o11y/src/session-metrics-schema.ts +++ b/cloudflare-o11y/src/session-metrics-schema.ts @@ -34,6 +34,8 @@ export const SessionMetricsParamsSchema = z.object({ autoCompactionCount: z.number().int().nonnegative(), terminationReason: z.enum(TerminationReasons), + + ingestVersion: z.number().int().nonnegative(), }); export type SessionMetricsParams = z.infer; diff --git a/cloudflare-o11y/test/index.spec.ts b/cloudflare-o11y/test/index.spec.ts index be0080241b..48f67ffa9b 100644 --- a/cloudflare-o11y/test/index.spec.ts +++ b/cloudflare-o11y/test/index.spec.ts @@ -203,6 +203,7 @@ describe('session metrics RPC', () => { compactionCount: 1, autoCompactionCount: 1, terminationReason: 'completed' as const, + ingestVersion: 1, }; } @@ -219,7 +220,7 @@ describe('session metrics RPC', () => { const call = aeSpy.writeDataPoint.mock.calls[0][0]; expect(call.indexes).toEqual(['cli']); expect(call.blobs).toEqual(['completed', 'cli', 'org_456']); - expect(call.doubles).toEqual([60000, 1500, 5, 12, 2, 21000, 0.15, 1, 0, 1]); + expect(call.doubles).toEqual([60000, 1500, 5, 12, 2, 21000, 0.15, 1, 0, 1, 1]); }); it('uses empty string for missing organizationId in AE', async () => { diff --git a/cloudflare-session-ingest/src/dos/SessionIngestDO.ts b/cloudflare-session-ingest/src/dos/SessionIngestDO.ts index 6d677b6f65..1a3f643fd5 100644 --- a/cloudflare-session-ingest/src/dos/SessionIngestDO.ts +++ b/cloudflare-session-ingest/src/dos/SessionIngestDO.ts @@ -111,9 +111,13 @@ export class SessionIngestDO extends DurableObject { }> { this.initSchema(); - // Persist identity so alarm() can recover kiloUserId/sessionId after hibernation + // Persist identity and version so alarm() can recover after hibernation writeIngestMetaIfChanged(this.sql, { key: 'kiloUserId', incomingValue: kiloUserId }); writeIngestMetaIfChanged(this.sql, { key: 'sessionId', incomingValue: sessionId }); + writeIngestMetaIfChanged(this.sql, { + key: 'ingestVersion', + incomingValue: String(ingestVersion), + }); const incomingByKey: Record = { title: undefined, @@ -224,7 +228,8 @@ export class SessionIngestDO extends DurableObject { private async emitSessionMetrics( kiloUserId: string, sessionId: string, - closeReason: TerminationReason + closeReason: TerminationReason, + ingestVersion: number ): Promise { this.initSchema(); @@ -252,7 +257,7 @@ export class SessionIngestDO extends DurableObject { const metrics = computeSessionMetrics(rows, closeReason); - await this.env.O11Y.ingestSessionMetrics({ kiloUserId, sessionId, ...metrics }); + await this.env.O11Y.ingestSessionMetrics({ kiloUserId, sessionId, ingestVersion, ...metrics }); // Mark metrics as emitted to prevent duplicates this.sql.exec( @@ -278,7 +283,7 @@ export class SessionIngestDO extends DurableObject { key: string; value: string | null; }>( - `SELECT key, value FROM ingest_meta WHERE key IN ('kiloUserId', 'sessionId', 'closeReason')` + `SELECT key, value FROM ingest_meta WHERE key IN ('kiloUserId', 'sessionId', 'closeReason', 'ingestVersion')` ) .toArray(); @@ -289,8 +294,9 @@ export class SessionIngestDO extends DurableObject { if (!kiloUserId || !sessionId) return; const closeReason = (meta['closeReason'] ?? 'abandoned') as TerminationReason; + const ingestVersion = Number(meta['ingestVersion'] ?? '0') || 0; - await this.emitSessionMetrics(kiloUserId, sessionId, closeReason); + await this.emitSessionMetrics(kiloUserId, sessionId, closeReason, ingestVersion); } async clear(): Promise { diff --git a/cloudflare-session-ingest/src/o11y-binding.d.ts b/cloudflare-session-ingest/src/o11y-binding.d.ts index e3a0e9fd71..a88bdb7a3f 100644 --- a/cloudflare-session-ingest/src/o11y-binding.d.ts +++ b/cloudflare-session-ingest/src/o11y-binding.d.ts @@ -32,6 +32,7 @@ type O11YSessionMetricsParams = { compactionCount: number; autoCompactionCount: number; terminationReason: 'completed' | 'error' | 'interrupted' | 'abandoned' | 'unknown'; + ingestVersion: number; }; type O11YBinding = Fetcher & { From 43bd83baec066d3f7a4cf7a4ad3caf8e54d087d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:39:53 +0100 Subject: [PATCH 02/10] refactor(session-ingest): tighten IngestMetaKey and writeIngestMetaIfChanged types Expand IngestMetaKey to cover all 9 keys stored in ingest_meta table, introduce ExtractableMetaKey for the 4 item-extracted keys, and narrow writeIngestMetaIfChanged to reject undefined values and require typed keys. --- .../src/dos/SessionIngestDO.ts | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cloudflare-session-ingest/src/dos/SessionIngestDO.ts b/cloudflare-session-ingest/src/dos/SessionIngestDO.ts index 1a3f643fd5..14fe026c76 100644 --- a/cloudflare-session-ingest/src/dos/SessionIngestDO.ts +++ b/cloudflare-session-ingest/src/dos/SessionIngestDO.ts @@ -18,14 +18,20 @@ import { type TerminationReason, } from './session-metrics'; +type IngestMetaKey = + | ExtractableMetaKey + | 'kiloUserId' + | 'sessionId' + | 'ingestVersion' + | 'closeReason' + | 'metricsEmitted'; + +type ExtractableMetaKey = 'title' | 'parentId' | 'platform' | 'orgId'; + function writeIngestMetaIfChanged( sql: SqlStorage, - params: { key: string; incomingValue: string | null | undefined } + params: { key: IngestMetaKey; incomingValue: string | null } ): { changed: boolean; value: string | null } { - if (params.incomingValue === undefined) { - return { changed: false, value: null }; - } - const existing = sql .exec<{ value: string | null; @@ -50,10 +56,8 @@ function writeIngestMetaIfChanged( return { changed: true, value: params.incomingValue }; } -type IngestMetaKey = 'title' | 'parentId' | 'platform' | 'orgId'; - const INGEST_META_EXTRACTORS: Array<{ - key: IngestMetaKey; + key: ExtractableMetaKey; extract: (item: IngestBatch[number]) => string | null | undefined; }> = [ { key: 'title', extract: extractNormalizedTitleFromItem }, @@ -62,7 +66,7 @@ const INGEST_META_EXTRACTORS: Array<{ { key: 'orgId', extract: extractNormalizedOrgIdFromItem }, ]; -type Changes = Array<{ name: IngestMetaKey; value: string | null }>; +type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>; export class SessionIngestDO extends DurableObject { private sql: SqlStorage; @@ -119,7 +123,7 @@ export class SessionIngestDO extends DurableObject { incomingValue: String(ingestVersion), }); - const incomingByKey: Record = { + const incomingByKey: Record = { title: undefined, parentId: undefined, platform: undefined, @@ -163,10 +167,12 @@ export class SessionIngestDO extends DurableObject { const changes: Changes = []; - for (const key of Object.keys(incomingByKey) as IngestMetaKey[]) { + for (const key of Object.keys(incomingByKey) as ExtractableMetaKey[]) { + const incoming = incomingByKey[key]; + if (incoming === undefined) continue; const meta = writeIngestMetaIfChanged(this.sql, { key, - incomingValue: incomingByKey[key], + incomingValue: incoming, }); if (meta.changed) { changes.push({ name: key, value: meta.value }); From 9258f81c8192d81e743da0a9108778c84b476697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:41:22 +0100 Subject: [PATCH 03/10] test(o11y): add unit tests for alerting evaluate helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Export and test computeBurnRate, dimensionKey, rowsToMap, and effectiveSeverity — pure functions that were previously module-private and untested. --- cloudflare-o11y/src/alerting/evaluate.ts | 15 ++- .../test/alerting-evaluate.spec.ts | 96 +++++++++++++++++++ 2 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 cloudflare-o11y/test/alerting-evaluate.spec.ts diff --git a/cloudflare-o11y/src/alerting/evaluate.ts b/cloudflare-o11y/src/alerting/evaluate.ts index 86eaa1fd47..b0dbf47a7d 100644 --- a/cloudflare-o11y/src/alerting/evaluate.ts +++ b/cloudflare-o11y/src/alerting/evaluate.ts @@ -16,19 +16,19 @@ import { getRecommendedModels } from './recommended-models'; * Compute the burn rate from an observed bad-event fraction and the SLO. * burn_rate = (bad_fraction) / (1 - SLO) */ -function computeBurnRate(badFraction: number, slo: number): number { +export function computeBurnRate(badFraction: number, slo: number): number { const errorBudget = 1 - slo; if (errorBudget <= 0) return Infinity; return badFraction / errorBudget; } -type DimensionKey = `${string}:${string}:${string}`; // provider:model:clientName +export type DimensionKey = `${string}:${string}:${string}`; // provider:model:clientName -function dimensionKey(provider: string, model: string, clientName: string): DimensionKey { +export function dimensionKey(provider: string, model: string, clientName: string): DimensionKey { return `${provider}:${model}:${clientName}`; } -function rowsToMap(rows: T[]): Map { +export function rowsToMap(rows: T[]): Map { const map = new Map(); for (const row of rows) { map.set(dimensionKey(row.provider, row.model, row.client_name), row); @@ -43,7 +43,12 @@ function rowsToMap): AlertSeverity { +export function effectiveSeverity( + baseSeverity: AlertSeverity, + clientName: string, + model: string, + recommendedModels: Set, +): AlertSeverity { if (baseSeverity === 'page') { if (clientName === 'kilo-gateway' && recommendedModels.has(model)) { return 'page'; diff --git a/cloudflare-o11y/test/alerting-evaluate.spec.ts b/cloudflare-o11y/test/alerting-evaluate.spec.ts new file mode 100644 index 0000000000..061252c5d0 --- /dev/null +++ b/cloudflare-o11y/test/alerting-evaluate.spec.ts @@ -0,0 +1,96 @@ +import { describe, it, expect } from 'vitest'; +import { computeBurnRate, dimensionKey, effectiveSeverity, rowsToMap } from '../src/alerting/evaluate'; + +describe('computeBurnRate', () => { + it('computes burn rate from bad fraction and SLO', () => { + // 1% bad with 99.9% SLO => 0.01 / 0.001 = 10 + expect(computeBurnRate(0.01, 0.999)).toBeCloseTo(10); + }); + + it('returns 0 when bad fraction is 0', () => { + expect(computeBurnRate(0, 0.999)).toBe(0); + }); + + it('returns Infinity when SLO is 1.0 (zero error budget)', () => { + expect(computeBurnRate(0.01, 1.0)).toBe(Infinity); + }); + + it('returns Infinity when SLO exceeds 1.0', () => { + expect(computeBurnRate(0.01, 1.1)).toBe(Infinity); + }); + + it('handles typical page-level burn rate threshold', () => { + // 14.4x burn rate means bad_fraction = 14.4 * (1 - 0.999) = 0.0144 + expect(computeBurnRate(0.0144, 0.999)).toBeCloseTo(14.4); + }); +}); + +describe('dimensionKey', () => { + it('constructs provider:model:clientName key', () => { + expect(dimensionKey('openai', 'gpt-4', 'kilo-gateway')).toBe('openai:gpt-4:kilo-gateway'); + }); + + it('handles empty strings', () => { + expect(dimensionKey('', '', '')).toBe('::'); + }); + + it('preserves colons in values', () => { + expect(dimensionKey('a:b', 'c', 'd')).toBe('a:b:c:d'); + }); +}); + +describe('rowsToMap', () => { + it('converts rows to a map keyed by dimension', () => { + const rows = [ + { provider: 'openai', model: 'gpt-4', client_name: 'kilo-gateway', value: 1 }, + { provider: 'anthropic', model: 'claude-sonnet-4.5', client_name: 'cli', value: 2 }, + ]; + const map = rowsToMap(rows); + expect(map.size).toBe(2); + expect(map.get('openai:gpt-4:kilo-gateway')?.value).toBe(1); + expect(map.get('anthropic:claude-sonnet-4.5:cli')?.value).toBe(2); + }); + + it('returns empty map for empty input', () => { + expect(rowsToMap([]).size).toBe(0); + }); + + it('last row wins for duplicate dimensions', () => { + const rows = [ + { provider: 'openai', model: 'gpt-4', client_name: 'cli', value: 1 }, + { provider: 'openai', model: 'gpt-4', client_name: 'cli', value: 2 }, + ]; + const map = rowsToMap(rows); + expect(map.size).toBe(1); + expect(map.get('openai:gpt-4:cli')?.value).toBe(2); + }); +}); + +describe('effectiveSeverity', () => { + const recommended = new Set(['gpt-4', 'claude-sonnet-4.5']); + + it('keeps page for recommended model on kilo-gateway', () => { + expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-4', recommended)).toBe('page'); + }); + + it('downgrades page to ticket for non-recommended model on kilo-gateway', () => { + expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-3.5-turbo', recommended)).toBe('ticket'); + }); + + it('downgrades page to ticket for recommended model on non-gateway client', () => { + expect(effectiveSeverity('page', 'cli', 'gpt-4', recommended)).toBe('ticket'); + }); + + it('downgrades page to ticket for non-recommended model on non-gateway client', () => { + expect(effectiveSeverity('page', 'cli', 'gpt-3.5-turbo', recommended)).toBe('ticket'); + }); + + it('keeps ticket severity unchanged regardless of model/client', () => { + expect(effectiveSeverity('ticket', 'kilo-gateway', 'gpt-4', recommended)).toBe('ticket'); + expect(effectiveSeverity('ticket', 'cli', 'gpt-3.5-turbo', recommended)).toBe('ticket'); + }); + + it('handles empty recommended models set', () => { + expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-4', new Set())).toBe('ticket'); + }); +}); From 1b2a3af5d8bd326ffb12a9fe9a4a337e92ccf102 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:42:08 +0100 Subject: [PATCH 04/10] test(session-ingest): add unit tests for ingest meta extractors Cover extractNormalizedTitleFromItem, extractNormalizedParentIdFromItem, extractNormalizedPlatformFromItem, and extractNormalizedOrgIdFromItem including normalization edge cases (empty, whitespace, null, non-string). --- .../src/dos/session-ingest-extractors.test.ts | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 cloudflare-session-ingest/src/dos/session-ingest-extractors.test.ts diff --git a/cloudflare-session-ingest/src/dos/session-ingest-extractors.test.ts b/cloudflare-session-ingest/src/dos/session-ingest-extractors.test.ts new file mode 100644 index 0000000000..25ace9e659 --- /dev/null +++ b/cloudflare-session-ingest/src/dos/session-ingest-extractors.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from 'vitest'; +import type { IngestBatch } from '../types/session-sync'; +import { + extractNormalizedTitleFromItem, + extractNormalizedParentIdFromItem, + extractNormalizedPlatformFromItem, + extractNormalizedOrgIdFromItem, +} from './session-ingest-extractors'; + +function sessionItem(data: Record): IngestBatch[number] { + return { type: 'session', data } as IngestBatch[number]; +} + +function kiloMetaItem(data: { platform: string; orgId?: string }): IngestBatch[number] { + return { type: 'kilo_meta', data } as IngestBatch[number]; +} + +function messageItem(): IngestBatch[number] { + return { type: 'message', data: { id: 'msg-1' } } as IngestBatch[number]; +} + +describe('extractNormalizedTitleFromItem', () => { + it('extracts title from session item', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: 'My Session' }))).toBe('My Session'); + }); + + it('trims whitespace from title', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: ' hello ' }))).toBe('hello'); + }); + + it('returns null for empty string title', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: '' }))).toBeNull(); + }); + + it('returns null for whitespace-only title', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: ' ' }))).toBeNull(); + }); + + it('returns undefined for session item with no title field', () => { + expect(extractNormalizedTitleFromItem(sessionItem({}))).toBeUndefined(); + }); + + it('returns undefined for non-session item', () => { + expect(extractNormalizedTitleFromItem(messageItem())).toBeUndefined(); + }); + + it('returns null for null title', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: null }))).toBeNull(); + }); + + it('returns undefined for numeric title', () => { + expect(extractNormalizedTitleFromItem(sessionItem({ title: 42 }))).toBeUndefined(); + }); +}); + +describe('extractNormalizedParentIdFromItem', () => { + it('extracts parentID from session item', () => { + expect(extractNormalizedParentIdFromItem(sessionItem({ parentID: 'parent-1' }))).toBe( + 'parent-1' + ); + }); + + it('returns null for empty parentID', () => { + expect(extractNormalizedParentIdFromItem(sessionItem({ parentID: '' }))).toBeNull(); + }); + + it('returns undefined for session item with no parentID', () => { + expect(extractNormalizedParentIdFromItem(sessionItem({}))).toBeUndefined(); + }); + + it('returns undefined for non-session item', () => { + expect(extractNormalizedParentIdFromItem(messageItem())).toBeUndefined(); + }); +}); + +describe('extractNormalizedPlatformFromItem', () => { + it('extracts platform from kilo_meta item', () => { + expect(extractNormalizedPlatformFromItem(kiloMetaItem({ platform: 'vscode' }))).toBe('vscode'); + }); + + it('returns undefined for non-kilo_meta item', () => { + expect(extractNormalizedPlatformFromItem(sessionItem({}))).toBeUndefined(); + }); + + it('trims whitespace from platform', () => { + expect(extractNormalizedPlatformFromItem(kiloMetaItem({ platform: ' cli ' }))).toBe('cli'); + }); +}); + +describe('extractNormalizedOrgIdFromItem', () => { + it('extracts orgId from kilo_meta item', () => { + expect( + extractNormalizedOrgIdFromItem(kiloMetaItem({ platform: 'cli', orgId: 'org-123' })) + ).toBe('org-123'); + }); + + it('returns undefined for kilo_meta item without orgId', () => { + expect(extractNormalizedOrgIdFromItem(kiloMetaItem({ platform: 'cli' }))).toBeUndefined(); + }); + + it('returns undefined for non-kilo_meta item', () => { + expect(extractNormalizedOrgIdFromItem(messageItem())).toBeUndefined(); + }); +}); From d70ff4e6b618d740e9bc458e506259e6b7114797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:42:45 +0100 Subject: [PATCH 05/10] test(session-ingest): add unit tests for getItemIdentity Cover all 8 item types and the unknown-type error path. --- .../src/util/compaction.test.ts | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 cloudflare-session-ingest/src/util/compaction.test.ts diff --git a/cloudflare-session-ingest/src/util/compaction.test.ts b/cloudflare-session-ingest/src/util/compaction.test.ts new file mode 100644 index 0000000000..da67999814 --- /dev/null +++ b/cloudflare-session-ingest/src/util/compaction.test.ts @@ -0,0 +1,69 @@ +import { describe, it, expect } from 'vitest'; +import type { SessionDataItem } from '../types/session-sync'; +import { getItemIdentity } from './compaction'; + +function item(type: string, data: Record = {}): SessionDataItem { + return { type, data } as SessionDataItem; +} + +describe('getItemIdentity', () => { + it('returns fixed id for session item', () => { + expect(getItemIdentity(item('session'))).toEqual({ + item_id: 'session', + item_type: 'session', + }); + }); + + it('returns message/{id} for message item', () => { + expect(getItemIdentity(item('message', { id: 'msg-42' }))).toEqual({ + item_id: 'message/msg-42', + item_type: 'message', + }); + }); + + it('returns messageID/id for part item', () => { + expect(getItemIdentity(item('part', { id: 'p-1', messageID: 'msg-42' }))).toEqual({ + item_id: 'msg-42/p-1', + item_type: 'part', + }); + }); + + it('returns fixed id for session_diff item', () => { + expect(getItemIdentity(item('session_diff'))).toEqual({ + item_id: 'session_diff', + item_type: 'session_diff', + }); + }); + + it('returns fixed id for model item', () => { + expect(getItemIdentity(item('model'))).toEqual({ + item_id: 'model', + item_type: 'model', + }); + }); + + it('returns fixed id for kilo_meta item', () => { + expect(getItemIdentity(item('kilo_meta'))).toEqual({ + item_id: 'kilo_meta', + item_type: 'kilo_meta', + }); + }); + + it('returns fixed id for session_open item', () => { + expect(getItemIdentity(item('session_open'))).toEqual({ + item_id: 'session_open', + item_type: 'session_open', + }); + }); + + it('returns fixed id for session_close item', () => { + expect(getItemIdentity(item('session_close'))).toEqual({ + item_id: 'session_close', + item_type: 'session_close', + }); + }); + + it('throws for unknown item type', () => { + expect(() => getItemIdentity(item('unknown_type'))).toThrow('Unknown item type: unknown_type'); + }); +}); From 9fbb369e9a1f6f9b195cc4be4344569fa4172484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:43:27 +0100 Subject: [PATCH 06/10] test(session-ingest): add unit tests for buildSharedSessionSnapshot Cover empty input, session info, message ordering, part attachment, buffered parts arriving before their message, mixed ordering, and silently ignored item types. --- .../src/util/share-output.test.ts | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 cloudflare-session-ingest/src/util/share-output.test.ts diff --git a/cloudflare-session-ingest/src/util/share-output.test.ts b/cloudflare-session-ingest/src/util/share-output.test.ts new file mode 100644 index 0000000000..98d18cbef7 --- /dev/null +++ b/cloudflare-session-ingest/src/util/share-output.test.ts @@ -0,0 +1,90 @@ +import { describe, it, expect } from 'vitest'; +import type { SessionDataItem } from '../types/session-sync'; +import { buildSharedSessionSnapshot } from './share-output'; + +function item(type: string, data: unknown): SessionDataItem { + return { type, data } as SessionDataItem; +} + +describe('buildSharedSessionSnapshot', () => { + it('returns empty snapshot for no items', () => { + const result = buildSharedSessionSnapshot([]); + expect(result).toEqual({ info: {}, messages: [] }); + }); + + it('sets info from session item', () => { + const result = buildSharedSessionSnapshot([item('session', { title: 'My Session' })]); + expect(result.info).toEqual({ title: 'My Session' }); + expect(result.messages).toEqual([]); + }); + + it('last session item wins for info', () => { + const result = buildSharedSessionSnapshot([ + item('session', { title: 'First' }), + item('session', { title: 'Second' }), + ]); + expect(result.info).toEqual({ title: 'Second' }); + }); + + it('adds messages in order', () => { + const result = buildSharedSessionSnapshot([ + item('message', { id: 'msg-1', role: 'user' }), + item('message', { id: 'msg-2', role: 'assistant' }), + ]); + expect(result.messages).toHaveLength(2); + expect(result.messages[0].info).toEqual({ id: 'msg-1', role: 'user' }); + expect(result.messages[1].info).toEqual({ id: 'msg-2', role: 'assistant' }); + expect(result.messages[0].parts).toEqual([]); + expect(result.messages[1].parts).toEqual([]); + }); + + it('attaches parts to existing message', () => { + const result = buildSharedSessionSnapshot([ + item('message', { id: 'msg-1' }), + item('part', { id: 'p-1', messageID: 'msg-1' }), + item('part', { id: 'p-2', messageID: 'msg-1' }), + ]); + expect(result.messages).toHaveLength(1); + expect(result.messages[0].parts).toHaveLength(2); + expect(result.messages[0].parts[0].id).toBe('p-1'); + expect(result.messages[0].parts[1].id).toBe('p-2'); + }); + + it('buffers parts arriving before their message', () => { + const result = buildSharedSessionSnapshot([ + item('part', { id: 'p-1', messageID: 'msg-1' }), + item('part', { id: 'p-2', messageID: 'msg-1' }), + item('message', { id: 'msg-1' }), + ]); + expect(result.messages).toHaveLength(1); + expect(result.messages[0].parts).toHaveLength(2); + expect(result.messages[0].parts[0].id).toBe('p-1'); + expect(result.messages[0].parts[1].id).toBe('p-2'); + }); + + it('handles mixed ordering of messages and parts', () => { + const result = buildSharedSessionSnapshot([ + item('part', { id: 'p-1', messageID: 'msg-2' }), + item('message', { id: 'msg-1' }), + item('part', { id: 'p-2', messageID: 'msg-1' }), + item('message', { id: 'msg-2' }), + item('part', { id: 'p-3', messageID: 'msg-2' }), + ]); + expect(result.messages).toHaveLength(2); + expect(result.messages[0].parts.map(p => p.id)).toEqual(['p-2']); + expect(result.messages[1].parts.map(p => p.id)).toEqual(['p-1', 'p-3']); + }); + + it('silently ignores kilo_meta, session_diff, model, and other types', () => { + const result = buildSharedSessionSnapshot([ + item('kilo_meta', { platform: 'vscode' }), + item('session_diff', [{ op: 'replace' }]), + item('model', [{ name: 'gpt-4' }]), + item('session_open', {}), + item('session_close', { reason: 'completed' }), + item('message', { id: 'msg-1' }), + ]); + expect(result.messages).toHaveLength(1); + expect(result.info).toEqual({}); + }); +}); From 88f62802e23f6c13895246d9bb742cafc7e92954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:44:15 +0100 Subject: [PATCH 07/10] test(session-ingest): add unit tests for withDORetry Cover first-try success, retryable error with eventual success, fresh stub per attempt, all attempts exhausted, non-retryable immediate throw, non-Error wrapping, and maxAttempts config. --- .../src/util/do-retry.test.ts | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 cloudflare-session-ingest/src/util/do-retry.test.ts diff --git a/cloudflare-session-ingest/src/util/do-retry.test.ts b/cloudflare-session-ingest/src/util/do-retry.test.ts new file mode 100644 index 0000000000..b9820a00df --- /dev/null +++ b/cloudflare-session-ingest/src/util/do-retry.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect, vi } from 'vitest'; +import { withDORetry, type DORetryConfig } from './do-retry'; + +const fastConfig: DORetryConfig = { + maxAttempts: 3, + baseBackoffMs: 1, + maxBackoffMs: 10, +}; + +function retryableError(message: string): Error { + const err = new Error(message); + (err as Error & { retryable: boolean }).retryable = true; + return err; +} + +describe('withDORetry', () => { + it('returns result on first success', async () => { + const getStub = vi.fn(() => 'stub'); + const operation = vi.fn(async () => 42); + + const result = await withDORetry(getStub, operation, 'test-op', fastConfig); + + expect(result).toBe(42); + expect(getStub).toHaveBeenCalledTimes(1); + expect(operation).toHaveBeenCalledTimes(1); + expect(operation).toHaveBeenCalledWith('stub'); + }); + + it('retries on retryable error and succeeds', async () => { + let attempt = 0; + const getStub = vi.fn(() => `stub-${++attempt}`); + const operation = vi.fn(async (stub: string) => { + if (stub === 'stub-1') throw retryableError('transient'); + return stub; + }); + + const result = await withDORetry(getStub, operation, 'test-op', fastConfig); + + expect(result).toBe('stub-2'); + expect(getStub).toHaveBeenCalledTimes(2); + expect(operation).toHaveBeenCalledTimes(2); + }); + + it('creates a fresh stub for each retry attempt', async () => { + const stubs: string[] = []; + let call = 0; + const getStub = vi.fn(() => { + const s = `stub-${call++}`; + stubs.push(s); + return s; + }); + const operation = vi.fn(async (stub: string) => { + if (stub !== 'stub-2') throw retryableError('fail'); + return 'ok'; + }); + + await withDORetry(getStub, operation, 'test-op', fastConfig); + + expect(stubs).toEqual(['stub-0', 'stub-1', 'stub-2']); + }); + + it('throws after all retry attempts exhausted', async () => { + const getStub = vi.fn(() => 'stub'); + const operation = vi.fn(async () => { + throw retryableError('always fails'); + }); + + await expect(withDORetry(getStub, operation, 'test-op', fastConfig)).rejects.toThrow( + 'always fails' + ); + + expect(operation).toHaveBeenCalledTimes(3); + }); + + it('throws immediately for non-retryable error without retrying', async () => { + const getStub = vi.fn(() => 'stub'); + const operation = vi.fn(async () => { + throw new Error('permanent failure'); + }); + + await expect(withDORetry(getStub, operation, 'test-op', fastConfig)).rejects.toThrow( + 'permanent failure' + ); + + expect(operation).toHaveBeenCalledTimes(1); + }); + + it('wraps non-Error thrown values in an Error', async () => { + const getStub = vi.fn(() => 'stub'); + const operation = vi.fn(async () => { + throw 'string error'; + }); + + await expect(withDORetry(getStub, operation, 'test-op', fastConfig)).rejects.toThrow( + 'string error' + ); + }); + + it('respects maxAttempts config', async () => { + const singleAttempt: DORetryConfig = { maxAttempts: 1, baseBackoffMs: 1, maxBackoffMs: 10 }; + const getStub = vi.fn(() => 'stub'); + const operation = vi.fn(async () => { + throw retryableError('fail'); + }); + + await expect(withDORetry(getStub, operation, 'test-op', singleAttempt)).rejects.toThrow('fail'); + + expect(operation).toHaveBeenCalledTimes(1); + }); +}); From ada39d41e2f054a1a830e3d51a99db7c599b6489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 15:45:04 +0100 Subject: [PATCH 08/10] test(session-ingest): add edge case tests for computeSessionMetrics Cover negative duration clamping, undefined timeToFirstResponseMs when only user or only assistant messages exist, last-wins for multiple kilo_meta and session items, and TTFR clamping when assistant precedes user. --- .../src/dos/session-metrics.test.ts | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/cloudflare-session-ingest/src/dos/session-metrics.test.ts b/cloudflare-session-ingest/src/dos/session-metrics.test.ts index ff19a4c836..82689150f5 100644 --- a/cloudflare-session-ingest/src/dos/session-metrics.test.ts +++ b/cloudflare-session-ingest/src/dos/session-metrics.test.ts @@ -244,4 +244,65 @@ describe('computeSessionMetrics', () => { const result = computeSessionMetrics(items, 'completed'); expect(result.totalTurns).toBe(1); }); + + it('clamps negative session duration to 0', () => { + const items = [makeItem('session', { time: { created: 61000, updated: 1000 } })]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.sessionDurationMs).toBe(0); + }); + + it('returns undefined timeToFirstResponseMs with only user messages', () => { + const items = [ + makeItem('message', { role: 'user', time: { created: 1000 } }), + makeItem('message', { role: 'user', time: { created: 2000 } }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.timeToFirstResponseMs).toBeUndefined(); + }); + + it('returns undefined timeToFirstResponseMs with only assistant messages', () => { + const items = [ + makeItem('message', { + role: 'assistant', + time: { created: 1000 }, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + cost: 0, + }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.timeToFirstResponseMs).toBeUndefined(); + }); + + it('uses last kilo_meta for platform and orgId', () => { + const items = [ + makeItem('kilo_meta', { platform: 'vscode', orgId: 'org-1' }), + makeItem('kilo_meta', { platform: 'cli', orgId: 'org-2' }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.platform).toBe('cli'); + expect(result.organizationId).toBe('org-2'); + }); + + it('uses last session item for duration timestamps', () => { + const items = [ + makeItem('session', { time: { created: 1000, updated: 2000 } }), + makeItem('session', { time: { created: 5000, updated: 10000 } }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.sessionDurationMs).toBe(5000); + }); + + it('clamps timeToFirstResponseMs to 0 when assistant precedes user', () => { + const items = [ + makeItem('message', { role: 'user', time: { created: 5000 } }), + makeItem('message', { + role: 'assistant', + time: { created: 1000 }, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + cost: 0, + }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.timeToFirstResponseMs).toBe(0); + }); }); From d0c3068d1f1f2c1199c1574a9f5f93f3cb600ce7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 16:00:13 +0100 Subject: [PATCH 09/10] fix(o11y): default ingestVersion to 0 for rolling deploy safety --- cloudflare-o11y/src/session-metrics-schema.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudflare-o11y/src/session-metrics-schema.ts b/cloudflare-o11y/src/session-metrics-schema.ts index 70ca05f328..7745431fb4 100644 --- a/cloudflare-o11y/src/session-metrics-schema.ts +++ b/cloudflare-o11y/src/session-metrics-schema.ts @@ -35,7 +35,7 @@ export const SessionMetricsParamsSchema = z.object({ terminationReason: z.enum(TerminationReasons), - ingestVersion: z.number().int().nonnegative(), + ingestVersion: z.number().int().nonnegative().default(0), }); export type SessionMetricsParams = z.infer; From b09667af9aa55ff2aa2c0d4429e494632b516bcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0=C4=87eki=C4=87?= Date: Sat, 7 Feb 2026 16:00:39 +0100 Subject: [PATCH 10/10] test(o11y): verify ingestVersion defaults to 0 when omitted --- cloudflare-o11y/test/index.spec.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cloudflare-o11y/test/index.spec.ts b/cloudflare-o11y/test/index.spec.ts index 48f67ffa9b..967abf5e98 100644 --- a/cloudflare-o11y/test/index.spec.ts +++ b/cloudflare-o11y/test/index.spec.ts @@ -251,6 +251,20 @@ describe('session metrics RPC', () => { expect(call.doubles[1]).toBe(-1); }); + it('defaults ingestVersion to 0 when omitted', async () => { + const aeSpy = makeWriteDataPointSpy(); + const env = makeTestEnv({ O11Y_SESSION_METRICS: aeSpy as unknown as AnalyticsEngineDataset }); + const ctx = createExecutionContext(); + const instance = new Worker(ctx, env); + + const params = makeValidSessionMetrics(); + delete (params as Record).ingestVersion; + await instance.ingestSessionMetrics(params); + + const call = aeSpy.writeDataPoint.mock.calls[0][0]; + expect(call.doubles[10]).toBe(0); + }); + it('rejects invalid session metrics', async () => { const env = makeTestEnv(); const ctx = createExecutionContext();