diff --git a/containers/api-proxy/Dockerfile b/containers/api-proxy/Dockerfile index aabe7991..2fac6f97 100644 --- a/containers/api-proxy/Dockerfile +++ b/containers/api-proxy/Dockerfile @@ -15,7 +15,7 @@ COPY package*.json ./ RUN npm ci --omit=dev # Copy application files -COPY server.js logging.js metrics.js rate-limiter.js ./ +COPY server.js logging.js metrics.js rate-limiter.js token-tracker.js ./ # Create non-root user RUN addgroup -S apiproxy && adduser -S apiproxy -G apiproxy diff --git a/containers/api-proxy/server.js b/containers/api-proxy/server.js index 8b663baf..76098cd8 100644 --- a/containers/api-proxy/server.js +++ b/containers/api-proxy/server.js @@ -18,6 +18,18 @@ const { HttpsProxyAgent } = require('https-proxy-agent'); const { generateRequestId, sanitizeForLog, logRequest } = require('./logging'); const metrics = require('./metrics'); const rateLimiter = require('./rate-limiter'); +let trackTokenUsage; +let closeLogStream; +try { + ({ trackTokenUsage, closeLogStream } = require('./token-tracker')); +} catch (err) { + if (err && err.code === 'MODULE_NOT_FOUND') { + trackTokenUsage = () => {}; + closeLogStream = () => {}; + } else { + throw err; + } +} // Create rate limiter from environment variables const limiter = rateLimiter.create(); @@ -423,6 +435,15 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider, basePath = res.writeHead(proxyRes.statusCode, resHeaders); proxyRes.pipe(res); + + // Attach token usage tracking (non-blocking, listens on same data/end events) + trackTokenUsage(proxyRes, { + requestId, + provider, + path: sanitizeForLog(req.url), + startTime, + metrics, + }); }); proxyReq.on('error', (err) => { @@ -851,13 +872,15 @@ if (require.main === module) { } // Graceful shutdown - process.on('SIGTERM', () => { + process.on('SIGTERM', async () => { logRequest('info', 'shutdown', { message: 'Received SIGTERM, shutting down gracefully' }); + await closeLogStream(); process.exit(0); }); - process.on('SIGINT', () => { + process.on('SIGINT', async () => { logRequest('info', 'shutdown', { message: 'Received SIGINT, shutting down gracefully' }); + await closeLogStream(); process.exit(0); }); } diff --git a/containers/api-proxy/token-tracker.js b/containers/api-proxy/token-tracker.js new file mode 100644 index 00000000..46d73c8e --- /dev/null +++ b/containers/api-proxy/token-tracker.js @@ -0,0 +1,408 @@ +/** + * Token usage tracking for AWF API Proxy. + * + * Intercepts LLM API responses (both streaming SSE and non-streaming JSON) + * to extract token usage data without adding latency to the client. + * + * Architecture: + * proxyRes (LLM response) → res (client) + * ├─ on('data'): buffer/inspect chunks for usage extraction + * └─ on('end'): finalize parsing → log to file + metrics + * + * For non-streaming responses: buffer the JSON body (up to MAX_BUFFER_SIZE), + * then parse it on 'end' to extract usage fields. + * For streaming (SSE) responses: scan each chunk for usage events as they + * are received, accumulate usage from message_start / message_delta / final + * data events, and log the aggregated result on 'end'. + * + * Zero external dependencies — uses Node.js built-in streams and fs. + */ + +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const { logRequest } = require('./logging'); + +// Max response body to buffer for non-streaming usage extraction (5 MB). +// Responses larger than this are still forwarded but usage is not extracted. +const MAX_BUFFER_SIZE = 5 * 1024 * 1024; + +// Token usage log file path (inside the mounted log volume) +const TOKEN_LOG_DIR = process.env.AWF_TOKEN_LOG_DIR || '/var/log/api-proxy'; +const TOKEN_LOG_FILE = path.join(TOKEN_LOG_DIR, 'token-usage.jsonl'); + +let logStream = null; + +/** + * Get or create the JSONL append stream for token usage logs. + * Uses a lazy singleton — created on first write. + */ +function getLogStream() { + if (logStream) return logStream; + try { + // Ensure directory exists + fs.mkdirSync(TOKEN_LOG_DIR, { recursive: true }); + logStream = fs.createWriteStream(TOKEN_LOG_FILE, { flags: 'a' }); + logStream.on('error', (err) => { + logRequest('warn', 'token_log_error', { error: err.message }); + logStream = null; + }); + return logStream; + } catch (err) { + logRequest('warn', 'token_log_init_error', { error: err.message }); + return null; + } +} + +/** + * Write a token usage record to the JSONL log file. + * Handles backpressure by dropping writes when the stream buffer is full. + */ +function writeTokenUsage(record) { + const stream = getLogStream(); + if (stream && !stream.writableEnded) { + const ok = stream.write(JSON.stringify(record) + '\n'); + if (!ok) { + // Backpressure — stream buffer full. Drop this write rather than + // accumulating unbounded memory. The 'drain' event will unblock + // future writes naturally. + logRequest('warn', 'token_log_backpressure', { request_id: record.request_id }); + } + } +} + +/** + * Check if a response is SSE (Server-Sent Events) streaming. + */ +function isStreamingResponse(headers) { + const ct = headers['content-type'] || ''; + return ct.includes('text/event-stream'); +} + +/** + * Extract token usage from a non-streaming JSON response body. + * + * Supports: + * - OpenAI/Copilot: { usage: { prompt_tokens, completion_tokens, total_tokens } } + * - Anthropic: { usage: { input_tokens, output_tokens, cache_creation_input_tokens, cache_read_input_tokens } } + * + * Also extracts the model field if present. + * + * @param {Buffer} body - Response body + * @returns {{ usage: object|null, model: string|null }} + */ +function extractUsageFromJson(body) { + try { + const text = body.toString('utf8'); + const json = JSON.parse(text); + const result = { usage: null, model: json.model || null }; + + if (json.usage && typeof json.usage === 'object') { + const usage = {}; + let hasField = false; + // Anthropic fields + if (typeof json.usage.input_tokens === 'number') { + usage.input_tokens = json.usage.input_tokens; + hasField = true; + } + if (typeof json.usage.output_tokens === 'number') { + usage.output_tokens = json.usage.output_tokens; + hasField = true; + } + if (typeof json.usage.cache_creation_input_tokens === 'number') { + usage.cache_creation_input_tokens = json.usage.cache_creation_input_tokens; + hasField = true; + } + if (typeof json.usage.cache_read_input_tokens === 'number') { + usage.cache_read_input_tokens = json.usage.cache_read_input_tokens; + hasField = true; + } + // OpenAI/Copilot fields + if (typeof json.usage.prompt_tokens === 'number') { + usage.prompt_tokens = json.usage.prompt_tokens; + hasField = true; + } + if (typeof json.usage.completion_tokens === 'number') { + usage.completion_tokens = json.usage.completion_tokens; + hasField = true; + } + if (typeof json.usage.total_tokens === 'number') { + usage.total_tokens = json.usage.total_tokens; + hasField = true; + } + if (hasField) { + result.usage = usage; + } + } + + return result; + } catch { + return { usage: null, model: null }; + } +} + +/** + * Extract token usage from a single SSE data line. + * + * SSE format: "data: {json}\n\n" + * + * Anthropic streaming events with usage: + * - message_start: { type: "message_start", message: { usage: { input_tokens, cache_creation_input_tokens, cache_read_input_tokens } } } + * - message_delta: { type: "message_delta", usage: { output_tokens } } + * + * OpenAI/Copilot streaming events with usage: + * - Final chunk: { usage: { prompt_tokens, completion_tokens, total_tokens } } + * + * @param {string} line - A single SSE data line (without "data: " prefix) + * @returns {{ usage: object|null, model: string|null }} + */ +function extractUsageFromSseLine(line) { + if (!line || line === '[DONE]') return { usage: null, model: null }; + + try { + const json = JSON.parse(line); + const result = { usage: null, model: json.model || null }; + + // Anthropic message_start: usage is inside message object + if (json.type === 'message_start' && json.message && json.message.usage) { + result.usage = {}; + const u = json.message.usage; + if (typeof u.input_tokens === 'number') result.usage.input_tokens = u.input_tokens; + if (typeof u.cache_creation_input_tokens === 'number') result.usage.cache_creation_input_tokens = u.cache_creation_input_tokens; + if (typeof u.cache_read_input_tokens === 'number') result.usage.cache_read_input_tokens = u.cache_read_input_tokens; + result.model = (json.message && json.message.model) || result.model; + return result; + } + + // Anthropic message_delta: usage at top level + if (json.type === 'message_delta' && json.usage) { + result.usage = {}; + if (typeof json.usage.output_tokens === 'number') result.usage.output_tokens = json.usage.output_tokens; + return result; + } + + // OpenAI/Copilot: usage at top level in final chunk + if (json.usage && typeof json.usage === 'object') { + result.usage = {}; + if (typeof json.usage.prompt_tokens === 'number') result.usage.prompt_tokens = json.usage.prompt_tokens; + if (typeof json.usage.completion_tokens === 'number') result.usage.completion_tokens = json.usage.completion_tokens; + if (typeof json.usage.total_tokens === 'number') result.usage.total_tokens = json.usage.total_tokens; + return result; + } + + return result; + } catch { + return { usage: null, model: null }; + } +} + +/** + * Extract all SSE data lines from a text chunk. + * Lines are prefixed with "data: " in the SSE protocol. + */ +function parseSseDataLines(text) { + const lines = []; + const parts = text.split('\n'); + for (const part of parts) { + const trimmed = part.trim(); + if (trimmed.startsWith('data: ')) { + lines.push(trimmed.slice(6)); + } else if (trimmed === 'data:') { + // empty data line + } + } + return lines; +} + +/** + * Normalize extracted usage into a unified format. + * + * Output fields: + * - input_tokens: number (from Anthropic input_tokens or OpenAI prompt_tokens) + * - output_tokens: number (from Anthropic output_tokens or OpenAI completion_tokens) + * - cache_read_tokens: number (Anthropic only, 0 for others) + * - cache_write_tokens: number (Anthropic only, 0 for others) + */ +function normalizeUsage(usage) { + if (!usage) return null; + + return { + input_tokens: usage.input_tokens ?? usage.prompt_tokens ?? 0, + output_tokens: usage.output_tokens ?? usage.completion_tokens ?? 0, + cache_read_tokens: usage.cache_read_input_tokens ?? 0, + cache_write_tokens: usage.cache_creation_input_tokens ?? 0, + }; +} + +/** + * Attach token usage tracking to an upstream response. + * + * This function listens on the proxyRes 'data' and 'end' events to extract + * token usage. It does NOT modify the response stream — the caller still + * does proxyRes.pipe(res) as before. + * + * @param {http.IncomingMessage} proxyRes - Upstream response + * @param {object} opts + * @param {string} opts.requestId - Request ID for correlation + * @param {string} opts.provider - Provider name (openai, anthropic, copilot, opencode) + * @param {string} opts.path - Request path + * @param {number} opts.startTime - Request start time (Date.now()) + * @param {object} opts.metrics - Metrics module reference + */ +function trackTokenUsage(proxyRes, opts) { + const { requestId, provider, path: reqPath, startTime, metrics: metricsRef } = opts; + const streaming = isStreamingResponse(proxyRes.headers); + + // Accumulate response body for usage extraction + const chunks = []; + let totalBytes = 0; + let overflow = false; + + // For streaming: accumulate usage across SSE events + let streamingUsage = {}; + let streamingModel = null; + let partialLine = ''; + + proxyRes.on('data', (chunk) => { + totalBytes += chunk.length; + + if (streaming) { + // Parse SSE data lines from this chunk to extract usage events + const text = partialLine + chunk.toString('utf8'); + // Keep any incomplete line at the end for next chunk + const lastNewline = text.lastIndexOf('\n'); + if (lastNewline >= 0) { + const complete = text.slice(0, lastNewline); + partialLine = text.slice(lastNewline + 1); + + const dataLines = parseSseDataLines(complete); + for (const line of dataLines) { + const { usage, model } = extractUsageFromSseLine(line); + if (model && !streamingModel) streamingModel = model; + if (usage) { + // Merge usage fields (Anthropic sends input in message_start, output in message_delta) + for (const [k, v] of Object.entries(usage)) { + streamingUsage[k] = v; + } + } + } + } else { + partialLine = text; + } + } else if (!overflow) { + if (totalBytes <= MAX_BUFFER_SIZE) { + chunks.push(chunk); + } else { + overflow = true; + chunks.length = 0; // free memory + } + } + }); + + proxyRes.on('end', () => { + // Only process successful responses (2xx) + if (proxyRes.statusCode < 200 || proxyRes.statusCode >= 300) return; + + const duration = Date.now() - startTime; + let usage = null; + let model = null; + + if (streaming) { + // Process any remaining partial line + if (partialLine.trim()) { + const dataLines = parseSseDataLines(partialLine); + for (const line of dataLines) { + const { usage: u, model: m } = extractUsageFromSseLine(line); + if (m && !streamingModel) streamingModel = m; + if (u) { + for (const [k, v] of Object.entries(u)) { + streamingUsage[k] = v; + } + } + } + } + + if (Object.keys(streamingUsage).length > 0) { + usage = streamingUsage; + model = streamingModel; + } + } else if (!overflow && chunks.length > 0) { + const body = Buffer.concat(chunks); + const result = extractUsageFromJson(body); + usage = result.usage; + model = result.model; + } + + const normalized = normalizeUsage(usage); + if (!normalized) return; + + // Update metrics + if (metricsRef) { + metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); + metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); + } + + // Build log record + const record = { + timestamp: new Date().toISOString(), + request_id: requestId, + provider, + model: model || 'unknown', + path: reqPath, + status: proxyRes.statusCode, + streaming, + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + duration_ms: duration, + response_bytes: totalBytes, + }; + + // Write to JSONL log file + writeTokenUsage(record); + + // Log summary to stdout + logRequest('info', 'token_usage', { + request_id: requestId, + provider, + model: model || 'unknown', + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + streaming, + }); + }); +} + +/** + * Close the log stream (for graceful shutdown). + * Returns a Promise that resolves once the stream has flushed. + */ +function closeLogStream() { + return new Promise((resolve) => { + if (logStream) { + logStream.end(() => { + logStream = null; + resolve(); + }); + } else { + resolve(); + } + }); +} + +module.exports = { + trackTokenUsage, + closeLogStream, + // Exported for testing + extractUsageFromJson, + extractUsageFromSseLine, + parseSseDataLines, + normalizeUsage, + isStreamingResponse, + writeTokenUsage, + TOKEN_LOG_FILE, +}; diff --git a/containers/api-proxy/token-tracker.test.js b/containers/api-proxy/token-tracker.test.js new file mode 100644 index 00000000..6238f187 --- /dev/null +++ b/containers/api-proxy/token-tracker.test.js @@ -0,0 +1,450 @@ +/** + * Tests for token-tracker.js + */ + +const { + extractUsageFromJson, + extractUsageFromSseLine, + parseSseDataLines, + normalizeUsage, + isStreamingResponse, + trackTokenUsage, +} = require('./token-tracker'); +const { EventEmitter } = require('events'); +const os = require('os'); +const path = require('path'); +const fs = require('fs'); + +// Redirect token log output to a temp dir to avoid /var/log permission errors +let tmpLogDir; +beforeAll(() => { + tmpLogDir = fs.mkdtempSync(path.join(os.tmpdir(), 'token-tracker-test-')); + process.env.AWF_TOKEN_LOG_DIR = tmpLogDir; +}); + +afterAll(() => { + fs.rmSync(tmpLogDir, { recursive: true, force: true }); + delete process.env.AWF_TOKEN_LOG_DIR; +}); + +// ── extractUsageFromJson ────────────────────────────────────────────── + +describe('extractUsageFromJson', () => { + test('extracts OpenAI usage format', () => { + const body = Buffer.from(JSON.stringify({ + id: 'chatcmpl-123', + model: 'gpt-4o', + usage: { + prompt_tokens: 100, + completion_tokens: 50, + total_tokens: 150, + }, + })); + + const result = extractUsageFromJson(body); + expect(result.model).toBe('gpt-4o'); + expect(result.usage).toEqual({ + prompt_tokens: 100, + completion_tokens: 50, + total_tokens: 150, + }); + }); + + test('extracts Anthropic usage format', () => { + const body = Buffer.from(JSON.stringify({ + id: 'msg_123', + model: 'claude-sonnet-4-20250514', + usage: { + input_tokens: 200, + output_tokens: 80, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 150, + }, + })); + + const result = extractUsageFromJson(body); + expect(result.model).toBe('claude-sonnet-4-20250514'); + expect(result.usage).toEqual({ + input_tokens: 200, + output_tokens: 80, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 150, + }); + }); + + test('returns null usage for response without usage field', () => { + const body = Buffer.from(JSON.stringify({ id: 'test', model: 'gpt-4o' })); + const result = extractUsageFromJson(body); + expect(result.usage).toBeNull(); + expect(result.model).toBe('gpt-4o'); + }); + + test('returns null for invalid JSON', () => { + const body = Buffer.from('not json'); + const result = extractUsageFromJson(body); + expect(result.usage).toBeNull(); + expect(result.model).toBeNull(); + }); + + test('returns null for empty buffer', () => { + const result = extractUsageFromJson(Buffer.alloc(0)); + expect(result.usage).toBeNull(); + }); + + test('returns null usage when usage object has no numeric fields', () => { + const body = Buffer.from(JSON.stringify({ + usage: { some_string: 'not a number' }, + })); + const result = extractUsageFromJson(body); + expect(result.usage).toBeNull(); + }); + + test('ignores non-numeric usage fields but keeps numeric ones', () => { + const body = Buffer.from(JSON.stringify({ + usage: { prompt_tokens: 'not a number', completion_tokens: 50 }, + })); + const result = extractUsageFromJson(body); + expect(result.usage).toEqual({ completion_tokens: 50 }); + }); +}); + +// ── extractUsageFromSseLine ─────────────────────────────────────────── + +describe('extractUsageFromSseLine', () => { + test('extracts Anthropic message_start usage', () => { + const line = JSON.stringify({ + type: 'message_start', + message: { + model: 'claude-sonnet-4-20250514', + usage: { + input_tokens: 500, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 400, + }, + }, + }); + + const result = extractUsageFromSseLine(line); + expect(result.model).toBe('claude-sonnet-4-20250514'); + expect(result.usage).toEqual({ + input_tokens: 500, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 400, + }); + }); + + test('extracts Anthropic message_delta usage', () => { + const line = JSON.stringify({ + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 42 }, + }); + + const result = extractUsageFromSseLine(line); + expect(result.usage).toEqual({ output_tokens: 42 }); + }); + + test('extracts OpenAI final chunk usage', () => { + const line = JSON.stringify({ + model: 'gpt-4o', + choices: [{ finish_reason: 'stop' }], + usage: { prompt_tokens: 100, completion_tokens: 30, total_tokens: 130 }, + }); + + const result = extractUsageFromSseLine(line); + expect(result.model).toBe('gpt-4o'); + expect(result.usage).toEqual({ + prompt_tokens: 100, + completion_tokens: 30, + total_tokens: 130, + }); + }); + + test('returns null for [DONE]', () => { + const result = extractUsageFromSseLine('[DONE]'); + expect(result.usage).toBeNull(); + }); + + test('returns null for empty string', () => { + const result = extractUsageFromSseLine(''); + expect(result.usage).toBeNull(); + }); + + test('returns null for non-usage SSE event', () => { + const line = JSON.stringify({ + type: 'content_block_delta', + delta: { type: 'text_delta', text: 'Hello' }, + }); + const result = extractUsageFromSseLine(line); + expect(result.usage).toBeNull(); + }); + + test('returns null for invalid JSON', () => { + const result = extractUsageFromSseLine('invalid json'); + expect(result.usage).toBeNull(); + }); +}); + +// ── parseSseDataLines ───────────────────────────────────────────────── + +describe('parseSseDataLines', () => { + test('extracts data lines from SSE text', () => { + const text = 'data: {"type":"ping"}\n\ndata: {"type":"content"}\n\n'; + const lines = parseSseDataLines(text); + expect(lines).toEqual(['{"type":"ping"}', '{"type":"content"}']); + }); + + test('handles empty data lines', () => { + const text = 'data:\n\ndata: {"a":1}\n\n'; + const lines = parseSseDataLines(text); + expect(lines).toEqual(['{"a":1}']); + }); + + test('handles data: [DONE]', () => { + const text = 'data: [DONE]\n\n'; + const lines = parseSseDataLines(text); + expect(lines).toEqual(['[DONE]']); + }); + + test('returns empty array for non-data text', () => { + const text = 'event: message\nid: 123\n\n'; + const lines = parseSseDataLines(text); + expect(lines).toEqual([]); + }); + + test('handles mixed content', () => { + const text = 'event: message\ndata: {"a":1}\ndata: {"b":2}\n\n'; + const lines = parseSseDataLines(text); + expect(lines).toEqual(['{"a":1}', '{"b":2}']); + }); +}); + +// ── normalizeUsage ──────────────────────────────────────────────────── + +describe('normalizeUsage', () => { + test('normalizes OpenAI format', () => { + const result = normalizeUsage({ + prompt_tokens: 100, + completion_tokens: 50, + total_tokens: 150, + }); + expect(result).toEqual({ + input_tokens: 100, + output_tokens: 50, + cache_read_tokens: 0, + cache_write_tokens: 0, + }); + }); + + test('normalizes Anthropic format', () => { + const result = normalizeUsage({ + input_tokens: 200, + output_tokens: 80, + cache_read_input_tokens: 150, + cache_creation_input_tokens: 10, + }); + expect(result).toEqual({ + input_tokens: 200, + output_tokens: 80, + cache_read_tokens: 150, + cache_write_tokens: 10, + }); + }); + + test('returns null for null input', () => { + expect(normalizeUsage(null)).toBeNull(); + }); + + test('returns null for undefined input', () => { + expect(normalizeUsage(undefined)).toBeNull(); + }); + + test('defaults missing fields to 0', () => { + const result = normalizeUsage({ input_tokens: 100 }); + expect(result).toEqual({ + input_tokens: 100, + output_tokens: 0, + cache_read_tokens: 0, + cache_write_tokens: 0, + }); + }); + + test('prefers Anthropic fields over OpenAI when both present', () => { + const result = normalizeUsage({ + input_tokens: 200, + prompt_tokens: 100, + output_tokens: 80, + completion_tokens: 50, + }); + expect(result.input_tokens).toBe(200); + expect(result.output_tokens).toBe(80); + }); +}); + +// ── isStreamingResponse ─────────────────────────────────────────────── + +describe('isStreamingResponse', () => { + test('detects text/event-stream', () => { + expect(isStreamingResponse({ 'content-type': 'text/event-stream' })).toBe(true); + }); + + test('detects text/event-stream with charset', () => { + expect(isStreamingResponse({ 'content-type': 'text/event-stream; charset=utf-8' })).toBe(true); + }); + + test('returns false for application/json', () => { + expect(isStreamingResponse({ 'content-type': 'application/json' })).toBe(false); + }); + + test('returns false for missing content-type', () => { + expect(isStreamingResponse({})).toBe(false); + }); +}); + +// ── trackTokenUsage integration ─────────────────────────────────────── + +describe('trackTokenUsage', () => { + test('extracts usage from non-streaming JSON response', (done) => { + const proxyRes = new EventEmitter(); + proxyRes.headers = { 'content-type': 'application/json' }; + proxyRes.statusCode = 200; + + const metricsRef = { + increment: jest.fn(), + }; + + trackTokenUsage(proxyRes, { + requestId: 'test-123', + provider: 'openai', + path: '/v1/chat/completions', + startTime: Date.now(), + metrics: metricsRef, + }); + + const body = JSON.stringify({ + model: 'gpt-4o', + usage: { prompt_tokens: 100, completion_tokens: 50, total_tokens: 150 }, + }); + + proxyRes.emit('data', Buffer.from(body)); + proxyRes.emit('end'); + + // Check metrics were updated + setTimeout(() => { + expect(metricsRef.increment).toHaveBeenCalledWith( + 'input_tokens_total', + { provider: 'openai' }, + 100, + ); + expect(metricsRef.increment).toHaveBeenCalledWith( + 'output_tokens_total', + { provider: 'openai' }, + 50, + ); + done(); + }, 10); + }); + + test('extracts usage from streaming SSE response', (done) => { + const proxyRes = new EventEmitter(); + proxyRes.headers = { 'content-type': 'text/event-stream' }; + proxyRes.statusCode = 200; + + const metricsRef = { + increment: jest.fn(), + }; + + trackTokenUsage(proxyRes, { + requestId: 'test-456', + provider: 'anthropic', + path: '/v1/messages', + startTime: Date.now(), + metrics: metricsRef, + }); + + // Simulate Anthropic streaming: message_start with input tokens, then message_delta with output tokens + const chunk1 = 'event: message_start\ndata: ' + JSON.stringify({ + type: 'message_start', + message: { model: 'claude-sonnet-4-20250514', usage: { input_tokens: 500 } }, + }) + '\n\n'; + + const chunk2 = 'event: content_block_delta\ndata: ' + JSON.stringify({ + type: 'content_block_delta', + delta: { type: 'text_delta', text: 'Hello' }, + }) + '\n\n'; + + const chunk3 = 'event: message_delta\ndata: ' + JSON.stringify({ + type: 'message_delta', + usage: { output_tokens: 42 }, + }) + '\n\ndata: [DONE]\n\n'; + + proxyRes.emit('data', Buffer.from(chunk1)); + proxyRes.emit('data', Buffer.from(chunk2)); + proxyRes.emit('data', Buffer.from(chunk3)); + proxyRes.emit('end'); + + setTimeout(() => { + expect(metricsRef.increment).toHaveBeenCalledWith( + 'input_tokens_total', + { provider: 'anthropic' }, + 500, + ); + expect(metricsRef.increment).toHaveBeenCalledWith( + 'output_tokens_total', + { provider: 'anthropic' }, + 42, + ); + done(); + }, 10); + }); + + test('skips non-2xx responses', (done) => { + const proxyRes = new EventEmitter(); + proxyRes.headers = { 'content-type': 'application/json' }; + proxyRes.statusCode = 401; + + const metricsRef = { increment: jest.fn() }; + + trackTokenUsage(proxyRes, { + requestId: 'test-789', + provider: 'openai', + path: '/v1/chat/completions', + startTime: Date.now(), + metrics: metricsRef, + }); + + proxyRes.emit('data', Buffer.from(JSON.stringify({ + error: { message: 'Unauthorized' }, + }))); + proxyRes.emit('end'); + + setTimeout(() => { + expect(metricsRef.increment).not.toHaveBeenCalled(); + done(); + }, 10); + }); + + test('handles response without usage field gracefully', (done) => { + const proxyRes = new EventEmitter(); + proxyRes.headers = { 'content-type': 'application/json' }; + proxyRes.statusCode = 200; + + const metricsRef = { increment: jest.fn() }; + + trackTokenUsage(proxyRes, { + requestId: 'test-no-usage', + provider: 'openai', + path: '/v1/models', + startTime: Date.now(), + metrics: metricsRef, + }); + + proxyRes.emit('data', Buffer.from(JSON.stringify({ data: [] }))); + proxyRes.emit('end'); + + setTimeout(() => { + expect(metricsRef.increment).not.toHaveBeenCalled(); + done(); + }, 10); + }); +});