Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cloudflare-o11y/src/alerting/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ import { getRecommendedModels } from './recommended-models';
* Compute the burn rate from an observed bad-event fraction and the SLO.
* burn_rate = (bad_fraction) / (1 - SLO)
*/
function computeBurnRate(badFraction: number, slo: number): number {
export function computeBurnRate(badFraction: number, slo: number): number {
const errorBudget = 1 - slo;
if (errorBudget <= 0) return Infinity;
return badFraction / errorBudget;
}

type DimensionKey = `${string}:${string}:${string}`; // provider:model:clientName
export type DimensionKey = `${string}:${string}:${string}`; // provider:model:clientName

function dimensionKey(provider: string, model: string, clientName: string): DimensionKey {
export function dimensionKey(provider: string, model: string, clientName: string): DimensionKey {
return `${provider}:${model}:${clientName}`;
}

function rowsToMap<T extends { provider: string; model: string; client_name: string }>(rows: T[]): Map<DimensionKey, T> {
export function rowsToMap<T extends { provider: string; model: string; client_name: string }>(rows: T[]): Map<DimensionKey, T> {
const map = new Map<DimensionKey, T>();
for (const row of rows) {
map.set(dimensionKey(row.provider, row.model, row.client_name), row);
Expand All @@ -43,7 +43,12 @@ function rowsToMap<T extends { provider: string; model: string; client_name: str
* Pages are only for recommended models on kilo-gateway.
* Everything else is a ticket at most.
*/
function effectiveSeverity(baseSeverity: AlertSeverity, clientName: string, model: string, recommendedModels: Set<string>): AlertSeverity {
export function effectiveSeverity(
baseSeverity: AlertSeverity,
clientName: string,
model: string,
recommendedModels: Set<string>,
): AlertSeverity {
if (baseSeverity === 'page') {
if (clientName === 'kilo-gateway' && recommendedModels.has(model)) {
return 'page';
Expand Down
2 changes: 2 additions & 0 deletions cloudflare-o11y/src/session-metrics-analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { SessionMetricsParams } from './session-metrics-schema';
* double8 = compactionCount
* double9 = stuckToolCallCount
* double10 = autoCompactionCount
* double11 = ingestVersion
*/
export function writeSessionMetricsDataPoint(params: SessionMetricsParams, env: Env): void {
const totalTokensSum =
Expand All @@ -41,6 +42,7 @@ export function writeSessionMetricsDataPoint(params: SessionMetricsParams, env:
params.compactionCount,
params.stuckToolCallCount,
params.autoCompactionCount,
params.ingestVersion,
],
});
}
2 changes: 2 additions & 0 deletions cloudflare-o11y/src/session-metrics-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export const SessionMetricsParamsSchema = z.object({
autoCompactionCount: z.number().int().nonnegative(),

terminationReason: z.enum(TerminationReasons),

ingestVersion: z.number().int().nonnegative().default(0),
});
Comment thread
iscekic marked this conversation as resolved.

export type SessionMetricsParams = z.infer<typeof SessionMetricsParamsSchema>;
96 changes: 96 additions & 0 deletions cloudflare-o11y/test/alerting-evaluate.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { describe, it, expect } from 'vitest';
import { computeBurnRate, dimensionKey, effectiveSeverity, rowsToMap } from '../src/alerting/evaluate';

describe('computeBurnRate', () => {
it('computes burn rate from bad fraction and SLO', () => {
// 1% bad with 99.9% SLO => 0.01 / 0.001 = 10
expect(computeBurnRate(0.01, 0.999)).toBeCloseTo(10);
});

it('returns 0 when bad fraction is 0', () => {
expect(computeBurnRate(0, 0.999)).toBe(0);
});

it('returns Infinity when SLO is 1.0 (zero error budget)', () => {
expect(computeBurnRate(0.01, 1.0)).toBe(Infinity);
});

it('returns Infinity when SLO exceeds 1.0', () => {
expect(computeBurnRate(0.01, 1.1)).toBe(Infinity);
});

it('handles typical page-level burn rate threshold', () => {
// 14.4x burn rate means bad_fraction = 14.4 * (1 - 0.999) = 0.0144
expect(computeBurnRate(0.0144, 0.999)).toBeCloseTo(14.4);
});
});

describe('dimensionKey', () => {
it('constructs provider:model:clientName key', () => {
expect(dimensionKey('openai', 'gpt-4', 'kilo-gateway')).toBe('openai:gpt-4:kilo-gateway');
});

it('handles empty strings', () => {
expect(dimensionKey('', '', '')).toBe('::');
});

it('preserves colons in values', () => {
expect(dimensionKey('a:b', 'c', 'd')).toBe('a:b:c:d');
});
});

describe('rowsToMap', () => {
it('converts rows to a map keyed by dimension', () => {
const rows = [
{ provider: 'openai', model: 'gpt-4', client_name: 'kilo-gateway', value: 1 },
{ provider: 'anthropic', model: 'claude-sonnet-4.5', client_name: 'cli', value: 2 },
];
const map = rowsToMap(rows);
expect(map.size).toBe(2);
expect(map.get('openai:gpt-4:kilo-gateway')?.value).toBe(1);
expect(map.get('anthropic:claude-sonnet-4.5:cli')?.value).toBe(2);
});

it('returns empty map for empty input', () => {
expect(rowsToMap([]).size).toBe(0);
});

it('last row wins for duplicate dimensions', () => {
const rows = [
{ provider: 'openai', model: 'gpt-4', client_name: 'cli', value: 1 },
{ provider: 'openai', model: 'gpt-4', client_name: 'cli', value: 2 },
];
const map = rowsToMap(rows);
expect(map.size).toBe(1);
expect(map.get('openai:gpt-4:cli')?.value).toBe(2);
});
});

describe('effectiveSeverity', () => {
const recommended = new Set(['gpt-4', 'claude-sonnet-4.5']);

it('keeps page for recommended model on kilo-gateway', () => {
expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-4', recommended)).toBe('page');
});

it('downgrades page to ticket for non-recommended model on kilo-gateway', () => {
expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-3.5-turbo', recommended)).toBe('ticket');
});

it('downgrades page to ticket for recommended model on non-gateway client', () => {
expect(effectiveSeverity('page', 'cli', 'gpt-4', recommended)).toBe('ticket');
});

it('downgrades page to ticket for non-recommended model on non-gateway client', () => {
expect(effectiveSeverity('page', 'cli', 'gpt-3.5-turbo', recommended)).toBe('ticket');
});

it('keeps ticket severity unchanged regardless of model/client', () => {
expect(effectiveSeverity('ticket', 'kilo-gateway', 'gpt-4', recommended)).toBe('ticket');
expect(effectiveSeverity('ticket', 'cli', 'gpt-3.5-turbo', recommended)).toBe('ticket');
});

it('handles empty recommended models set', () => {
expect(effectiveSeverity('page', 'kilo-gateway', 'gpt-4', new Set())).toBe('ticket');
});
});
17 changes: 16 additions & 1 deletion cloudflare-o11y/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ describe('session metrics RPC', () => {
compactionCount: 1,
autoCompactionCount: 1,
terminationReason: 'completed' as const,
ingestVersion: 1,
};
}

Expand All @@ -219,7 +220,7 @@ describe('session metrics RPC', () => {
const call = aeSpy.writeDataPoint.mock.calls[0][0];
expect(call.indexes).toEqual(['cli']);
expect(call.blobs).toEqual(['completed', 'cli', 'org_456']);
expect(call.doubles).toEqual([60000, 1500, 5, 12, 2, 21000, 0.15, 1, 0, 1]);
expect(call.doubles).toEqual([60000, 1500, 5, 12, 2, 21000, 0.15, 1, 0, 1, 1]);
});

it('uses empty string for missing organizationId in AE', async () => {
Expand Down Expand Up @@ -250,6 +251,20 @@ describe('session metrics RPC', () => {
expect(call.doubles[1]).toBe(-1);
});

it('defaults ingestVersion to 0 when omitted', async () => {
const aeSpy = makeWriteDataPointSpy();
const env = makeTestEnv({ O11Y_SESSION_METRICS: aeSpy as unknown as AnalyticsEngineDataset });
const ctx = createExecutionContext();
const instance = new Worker(ctx, env);

const params = makeValidSessionMetrics();
delete (params as Record<string, unknown>).ingestVersion;
await instance.ingestSessionMetrics(params);

const call = aeSpy.writeDataPoint.mock.calls[0][0];
expect(call.doubles[10]).toBe(0);
});

it('rejects invalid session metrics', async () => {
const env = makeTestEnv();
const ctx = createExecutionContext();
Expand Down
46 changes: 29 additions & 17 deletions cloudflare-session-ingest/src/dos/SessionIngestDO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ import {
type TerminationReason,
} from './session-metrics';

type IngestMetaKey =
| ExtractableMetaKey
| 'kiloUserId'
| 'sessionId'
| 'ingestVersion'
| 'closeReason'
| 'metricsEmitted';

type ExtractableMetaKey = 'title' | 'parentId' | 'platform' | 'orgId';

function writeIngestMetaIfChanged(
sql: SqlStorage,
params: { key: string; incomingValue: string | null | undefined }
params: { key: IngestMetaKey; incomingValue: string | null }
): { changed: boolean; value: string | null } {
if (params.incomingValue === undefined) {
return { changed: false, value: null };
}

const existing = sql
.exec<{
value: string | null;
Expand All @@ -50,10 +56,8 @@ function writeIngestMetaIfChanged(
return { changed: true, value: params.incomingValue };
}

type IngestMetaKey = 'title' | 'parentId' | 'platform' | 'orgId';

const INGEST_META_EXTRACTORS: Array<{
key: IngestMetaKey;
key: ExtractableMetaKey;
extract: (item: IngestBatch[number]) => string | null | undefined;
}> = [
{ key: 'title', extract: extractNormalizedTitleFromItem },
Expand All @@ -62,7 +66,7 @@ const INGEST_META_EXTRACTORS: Array<{
{ key: 'orgId', extract: extractNormalizedOrgIdFromItem },
];

type Changes = Array<{ name: IngestMetaKey; value: string | null }>;
type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>;

export class SessionIngestDO extends DurableObject<Env> {
private sql: SqlStorage;
Expand Down Expand Up @@ -111,11 +115,15 @@ export class SessionIngestDO extends DurableObject<Env> {
}> {
this.initSchema();

// Persist identity so alarm() can recover kiloUserId/sessionId after hibernation
// Persist identity and version so alarm() can recover after hibernation
writeIngestMetaIfChanged(this.sql, { key: 'kiloUserId', incomingValue: kiloUserId });
writeIngestMetaIfChanged(this.sql, { key: 'sessionId', incomingValue: sessionId });
writeIngestMetaIfChanged(this.sql, {
key: 'ingestVersion',
incomingValue: String(ingestVersion),
Comment thread
iscekic marked this conversation as resolved.
});

const incomingByKey: Record<IngestMetaKey, string | null | undefined> = {
const incomingByKey: Record<ExtractableMetaKey, string | null | undefined> = {
title: undefined,
parentId: undefined,
platform: undefined,
Expand Down Expand Up @@ -159,10 +167,12 @@ export class SessionIngestDO extends DurableObject<Env> {

const changes: Changes = [];

for (const key of Object.keys(incomingByKey) as IngestMetaKey[]) {
for (const key of Object.keys(incomingByKey) as ExtractableMetaKey[]) {
const incoming = incomingByKey[key];
if (incoming === undefined) continue;
const meta = writeIngestMetaIfChanged(this.sql, {
key,
incomingValue: incomingByKey[key],
incomingValue: incoming,
});
if (meta.changed) {
changes.push({ name: key, value: meta.value });
Expand Down Expand Up @@ -224,7 +234,8 @@ export class SessionIngestDO extends DurableObject<Env> {
private async emitSessionMetrics(
kiloUserId: string,
sessionId: string,
closeReason: TerminationReason
closeReason: TerminationReason,
ingestVersion: number
): Promise<boolean> {
this.initSchema();

Expand Down Expand Up @@ -252,7 +263,7 @@ export class SessionIngestDO extends DurableObject<Env> {

const metrics = computeSessionMetrics(rows, closeReason);

await this.env.O11Y.ingestSessionMetrics({ kiloUserId, sessionId, ...metrics });
await this.env.O11Y.ingestSessionMetrics({ kiloUserId, sessionId, ingestVersion, ...metrics });

// Mark metrics as emitted to prevent duplicates
this.sql.exec(
Expand All @@ -278,7 +289,7 @@ export class SessionIngestDO extends DurableObject<Env> {
key: string;
value: string | null;
}>(
`SELECT key, value FROM ingest_meta WHERE key IN ('kiloUserId', 'sessionId', 'closeReason')`
`SELECT key, value FROM ingest_meta WHERE key IN ('kiloUserId', 'sessionId', 'closeReason', 'ingestVersion')`
)
.toArray();

Expand All @@ -289,8 +300,9 @@ export class SessionIngestDO extends DurableObject<Env> {
if (!kiloUserId || !sessionId) return;

const closeReason = (meta['closeReason'] ?? 'abandoned') as TerminationReason;
Comment thread
iscekic marked this conversation as resolved.
const ingestVersion = Number(meta['ingestVersion'] ?? '0') || 0;
Comment thread
iscekic marked this conversation as resolved.

await this.emitSessionMetrics(kiloUserId, sessionId, closeReason);
await this.emitSessionMetrics(kiloUserId, sessionId, closeReason, ingestVersion);
}

async clear(): Promise<void> {
Expand Down
Loading