From 1e61a55fd9b141d70fc1ffe08bb94dfa41573e92 Mon Sep 17 00:00:00 2001 From: Cascade Bot Date: Sat, 14 Mar 2026 21:27:15 +0000 Subject: [PATCH] fix(codex): normalize LLM call logging to per-turn boundaries --- src/backends/codex/index.ts | 99 ++++++++++++++--- tests/unit/backends/codex.test.ts | 169 +++++++++++++++++++++++++++++- 2 files changed, 250 insertions(+), 18 deletions(-) diff --git a/src/backends/codex/index.ts b/src/backends/codex/index.ts index b55bb3b8..2f845be4 100644 --- a/src/backends/codex/index.ts +++ b/src/backends/codex/index.ts @@ -40,6 +40,18 @@ type UsageSummary = { cachedTokens?: number; costUsd?: number; }; +/** + * Accumulator for a single Codex turn (bounded by turn.started → turn.completed). + * Collects text, tool summaries, and usage across multiple JSONL events so that + * exactly one storeLlmCall row is persisted per completed turn — not one row per + * intermediate usage-bearing event. + */ +type CodexTurnAccumulator = { + textSummary: string[]; + toolNames: string[]; + usage: UsageSummary | null; +}; + type CodexLineContext = { input: AgentExecutionPlan; model: string; @@ -49,6 +61,8 @@ type CodexLineContext = { llmCallCount: number; cost?: number; finalError?: string; + /** Accumulator for the turn currently in progress. Reset on turn.started/thread.started. */ + currentTurn: CodexTurnAccumulator; }; function appendEngineLog(path: string | undefined, chunk: string): void { @@ -250,39 +264,87 @@ function logText(context: CodexLineContext, text: string): void { context.input.progressReporter.onText(text); } -function trackUsage(context: CodexLineContext, responseLine: string, usage: UsageSummary): void { - context.cost = usage.costUsd ?? context.cost; +/** + * Merge new usage data into the current turn accumulator. + * Intermediate events (e.g. response.completed) may carry usage before turn.completed + * fires. We accumulate here rather than persisting immediately to avoid duplicate rows. + * The last non-null value wins for each field, matching the pattern where response.completed + * carries per-response totals and turn.completed carries aggregate turn totals. + */ +function accumulateTurnUsage(context: CodexLineContext, usage: UsageSummary): void { + const acc = context.currentTurn; + if (!acc.usage) { + acc.usage = { ...usage }; + } else { + // Override with new values where present — turn.completed totals supersede response.completed + if (usage.inputTokens !== undefined) acc.usage.inputTokens = usage.inputTokens; + if (usage.outputTokens !== undefined) acc.usage.outputTokens = usage.outputTokens; + if (usage.cachedTokens !== undefined) acc.usage.cachedTokens = usage.cachedTokens; + if (usage.costUsd !== undefined) acc.usage.costUsd = usage.costUsd; + } +} + +/** + * Persist exactly one storeLlmCall row for the completed turn, then reset the accumulator. + * Called only from turn.completed to guarantee one row per turn, never from intermediate events. + */ +function persistTurnLlmCall(context: CodexLineContext): void { + const acc = context.currentTurn; + const usage = acc.usage; + if (usage) { + context.cost = usage.costUsd ?? context.cost; + } context.llmCallCount += 1; + + // Build a compact turn-scoped payload: text summary + tool names + usage. + // Storing this instead of the raw event JSONL keeps the payload small and readable. + const turnPayload = JSON.stringify({ + turn: context.llmCallCount, + text: acc.textSummary.join(' ').slice(0, 500) || undefined, + tools: acc.toolNames.length > 0 ? acc.toolNames : undefined, + usage: usage ?? undefined, + }); + logLlmCall({ runId: context.input.runId, callNumber: context.llmCallCount, model: context.model, - inputTokens: usage.inputTokens, - outputTokens: usage.outputTokens, - cachedTokens: usage.cachedTokens, - costUsd: usage.costUsd, - response: responseLine, + inputTokens: usage?.inputTokens, + outputTokens: usage?.outputTokens, + cachedTokens: usage?.cachedTokens, + costUsd: usage?.costUsd, + response: turnPayload, engineLabel: 'Codex', }); + + // Reset the accumulator for the next turn + context.currentTurn = { textSummary: [], toolNames: [], usage: null }; } /** * Handles structural turn/thread/item lifecycle events. * Returns true if the event was fully handled and no further processing is needed. + * + * Persistence boundary: ONE storeLlmCall row is written exactly when turn.completed fires, + * using data accumulated across all events in the turn. Intermediate usage-bearing events + * (e.g. response.completed) update the accumulator only; they do NOT persist a row. */ async function handleStructuralEvent( context: CodexLineContext, - responseLine: string, parsed: JsonRecord, eventType: string, ): Promise { if (eventType === 'turn.completed') { await trackIteration(context); + // Merge any usage attached to turn.completed into the accumulator, then persist. const usage = extractUsage(parsed); - if (usage) trackUsage(context, responseLine, usage); + if (usage) accumulateTurnUsage(context, usage); + persistTurnLlmCall(context); return true; } if (eventType === 'turn.started' || eventType === 'thread.started') { + // Reset turn accumulator at the start of each new turn + context.currentTurn = { textSummary: [], toolNames: [], usage: null }; return true; } if (eventType === 'item.started') { @@ -294,14 +356,10 @@ async function handleStructuralEvent( return false; } -async function handleParsedLine( - context: CodexLineContext, - responseLine: string, - parsed: JsonRecord, -): Promise { +async function handleParsedLine(context: CodexLineContext, parsed: JsonRecord): Promise { const eventType = typeof parsed.type === 'string' ? parsed.type : ''; - if (await handleStructuralEvent(context, responseLine, parsed, eventType)) return; + if (await handleStructuralEvent(context, parsed, eventType)) return; const { textParts, toolCall, usage, error } = parseCodexEvent(parsed); @@ -311,6 +369,8 @@ async function handleParsedLine( for (const text of textParts) { logText(context, text); + // Accumulate text into the turn buffer for compact per-call payload + context.currentTurn.textSummary.push(text.slice(0, 200)); } if (toolCall) { @@ -319,11 +379,15 @@ async function handleParsedLine( input: toolCall.input, }); context.input.progressReporter.onToolCall(toolCall.name, toolCall.input); + // Track tool name in turn buffer for the compact payload + context.currentTurn.toolNames.push(toolCall.name); } if (usage) { context.input.logWriter('DEBUG', 'Codex usage', { usage }); - trackUsage(context, responseLine, usage); + // Accumulate usage into the turn buffer; do NOT persist here. + // Persistence happens exactly once on turn.completed to avoid duplicate rows. + accumulateTurnUsage(context, usage); } if (error) { @@ -354,7 +418,7 @@ async function processStdoutLine(context: CodexLineContext, line: string): Promi return; } - await handleParsedLine(context, line, parsed); + await handleParsedLine(context, parsed); } function resolveCodexModel(cascadeModel: string): string { @@ -614,6 +678,7 @@ export class CodexEngine implements AgentEngine { llmCallCount, cost, finalError, + currentTurn: { textSummary: [], toolNames: [], usage: null }, }; child.once('error', (error) => { diff --git a/tests/unit/backends/codex.test.ts b/tests/unit/backends/codex.test.ts index 1c73b3bb..5e1feffa 100644 --- a/tests/unit/backends/codex.test.ts +++ b/tests/unit/backends/codex.test.ts @@ -450,15 +450,23 @@ describe('CodexEngine', () => { const outputPath = args[args.indexOf('-o') + 1]; return createMockChild({ stdoutLines: [ + JSON.stringify({ type: 'turn.started' }), JSON.stringify({ text: 'Thinking...' }), JSON.stringify({ tool_name: 'Bash', tool_input: { command: 'cascade-tools session finish --comment done' }, }), + // Intermediate usage event — accumulates into turn, does NOT persist a row JSON.stringify({ usage: { input_tokens: 11, output_tokens: 7 }, total_cost_usd: 0.42, }), + // turn.completed finalizes and persists the accumulated turn data + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 11, output_tokens: 7 }, + total_cost_usd: 0.42, + }), ], onBeforeClose: () => { writeFileSync( @@ -839,6 +847,7 @@ describe('CodexEngine', () => { const outputPath = args[args.indexOf('-o') + 1]; return createMockChild({ stdoutLines: [ + JSON.stringify({ type: 'turn.started' }), JSON.stringify({ type: 'item.completed', item: { type: 'message', content: [{ type: 'text', text: 'Planning...' }] }, @@ -851,10 +860,16 @@ describe('CodexEngine', () => { arguments: '{"command":"cascade-tools session finish --comment done"}', }, }), + // response.completed carries usage — accumulates into turn, does NOT persist a row yet JSON.stringify({ type: 'response.completed', response: { usage: { input_tokens: 100, output_tokens: 50 } }, }), + // turn.completed is the persistence boundary — one row per completed turn + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 100, output_tokens: 50 }, + }), ], onBeforeClose: () => { writeFileSync(outputPath, 'Planning complete.', 'utf-8'); @@ -868,11 +883,14 @@ describe('CodexEngine', () => { const result = await engine.execute(input); expect(result.success).toBe(true); - expect(input.progressReporter.onIteration).toHaveBeenCalledTimes(2); + // 2 item.completed events increment iteration + 1 turn.completed = 3 total + expect(input.progressReporter.onIteration).toHaveBeenCalledTimes(3); expect(input.progressReporter.onText).toHaveBeenCalledWith('Planning...'); expect(input.progressReporter.onToolCall).toHaveBeenCalledWith('bash', { command: 'cascade-tools session finish --comment done', }); + // Exactly ONE storeLlmCall row per completed turn + expect(mockStoreLlmCall).toHaveBeenCalledTimes(1); expect(mockStoreLlmCall).toHaveBeenCalledWith( expect.objectContaining({ inputTokens: 100, outputTokens: 50 }), ); @@ -907,6 +925,155 @@ describe('CodexEngine', () => { expect(input.progressReporter.onText).toHaveBeenCalledWith('Final answer.'); expect(input.progressReporter.onIteration).toHaveBeenCalledTimes(1); }); + + // ─── Turn-scoped accumulator / multi-turn / dedup tests ─────────────────── + + it('emits exactly one storeLlmCall row per completed turn across a multi-turn stream', async () => { + mockSpawn.mockImplementation((_cmd: string, args: string[]) => { + const outputPath = args[args.indexOf('-o') + 1]; + return createMockChild({ + stdoutLines: [ + // Turn 1 + JSON.stringify({ type: 'turn.started' }), + JSON.stringify({ + type: 'item.completed', + item: { type: 'agent_message', text: 'First.' }, + }), + JSON.stringify({ + type: 'response.completed', + response: { usage: { input_tokens: 50, output_tokens: 20 } }, + }), + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 50, output_tokens: 20 }, + }), + // Turn 2 + JSON.stringify({ type: 'turn.started' }), + JSON.stringify({ + type: 'item.completed', + item: { type: 'agent_message', text: 'Second.' }, + }), + JSON.stringify({ + type: 'response.completed', + response: { usage: { input_tokens: 80, output_tokens: 30 } }, + }), + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 80, output_tokens: 30 }, + }), + ], + onBeforeClose: () => writeFileSync(outputPath, 'Multi-turn done.', 'utf-8'), + }); + }); + + const engine = new CodexEngine(); + const input = makeInput({ repoDir: workspaceDir, runId: 'run-multiturn' }); + const result = await engine.execute(input); + + expect(result.success).toBe(true); + // Exactly two rows — one per completed turn + expect(mockStoreLlmCall).toHaveBeenCalledTimes(2); + // Stable, sequential callNumber values + expect(mockStoreLlmCall).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ callNumber: 1, inputTokens: 50, outputTokens: 20 }), + ); + expect(mockStoreLlmCall).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ callNumber: 2, inputTokens: 80, outputTokens: 30 }), + ); + }); + + it('stores only one row when both response.completed and turn.completed carry usage (duplicate-usage prevention)', async () => { + mockSpawn.mockImplementation((_cmd: string, args: string[]) => { + const outputPath = args[args.indexOf('-o') + 1]; + return createMockChild({ + stdoutLines: [ + JSON.stringify({ type: 'turn.started' }), + // response.completed fires with usage first (intermediate event) + JSON.stringify({ + type: 'response.completed', + response: { usage: { input_tokens: 100, output_tokens: 40 } }, + }), + // turn.completed fires with aggregate usage (the definitive values) + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 120, output_tokens: 45 }, + }), + ], + onBeforeClose: () => writeFileSync(outputPath, 'done', 'utf-8'), + }); + }); + + const engine = new CodexEngine(); + const input = makeInput({ repoDir: workspaceDir, runId: 'run-dedup' }); + await engine.execute(input); + + // Only ONE row, not two (no duplicate from response.completed) + expect(mockStoreLlmCall).toHaveBeenCalledTimes(1); + // turn.completed totals supersede response.completed values + expect(mockStoreLlmCall).toHaveBeenCalledWith( + expect.objectContaining({ inputTokens: 120, outputTokens: 45 }), + ); + }); + + it('stores a compact turn-scoped payload with text summary and tool names', async () => { + mockSpawn.mockImplementation((_cmd: string, args: string[]) => { + const outputPath = args[args.indexOf('-o') + 1]; + return createMockChild({ + stdoutLines: [ + JSON.stringify({ type: 'turn.started' }), + JSON.stringify({ + type: 'item.completed', + item: { type: 'agent_message', text: 'I will run a command.' }, + }), + JSON.stringify({ + type: 'item.completed', + item: { type: 'function_call', name: 'bash', arguments: '{"command":"ls"}' }, + }), + JSON.stringify({ + type: 'turn.completed', + usage: { input_tokens: 30, output_tokens: 10 }, + }), + ], + onBeforeClose: () => writeFileSync(outputPath, 'done', 'utf-8'), + }); + }); + + const engine = new CodexEngine(); + const input = makeInput({ repoDir: workspaceDir, runId: 'run-payload-shape' }); + await engine.execute(input); + + expect(mockStoreLlmCall).toHaveBeenCalledTimes(1); + const [{ response }] = mockStoreLlmCall.mock.calls[0] as [{ response: string }][]; + const payload = JSON.parse(response) as Record; + // Payload must be a compact object, NOT a raw JSONL line dump + expect(payload).toMatchObject({ + turn: 1, + tools: ['bash'], + usage: { inputTokens: 30, outputTokens: 10 }, + }); + expect(typeof payload.text).toBe('string'); + // Payload must be reasonably sized (< 2 KB) — not a multi-KB raw event dump + expect(response.length).toBeLessThan(2000); + }); + + it('does not call storeLlmCall when no turn.completed event fires (no response events only)', async () => { + mockSpawn.mockImplementation((_cmd: string, args: string[]) => { + const outputPath = args[args.indexOf('-o') + 1]; + return createMockChild({ + stdoutLines: [JSON.stringify({ text: 'Bare text without turn lifecycle events' })], + onBeforeClose: () => writeFileSync(outputPath, 'bare output', 'utf-8'), + }); + }); + + const engine = new CodexEngine(); + const input = makeInput({ repoDir: workspaceDir, runId: 'run-no-turn-completed' }); + await engine.execute(input); + + // Without turn.completed, nothing should be persisted — avoids phantom rows + expect(mockStoreLlmCall).not.toHaveBeenCalled(); + }); }); describe('Codex subscription auth', () => {