Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 82 additions & 17 deletions src/backends/codex/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ type UsageSummary = {
cachedTokens?: number;
costUsd?: number;
};
/**
* Accumulator for a single Codex turn (bounded by turn.started → turn.completed).
* Collects text, tool summaries, and usage across multiple JSONL events so that
* exactly one storeLlmCall row is persisted per completed turn — not one row per
* intermediate usage-bearing event.
*/
type CodexTurnAccumulator = {
textSummary: string[];
toolNames: string[];
usage: UsageSummary | null;
};

type CodexLineContext = {
input: AgentExecutionPlan;
model: string;
Expand All @@ -49,6 +61,8 @@ type CodexLineContext = {
llmCallCount: number;
cost?: number;
finalError?: string;
/** Accumulator for the turn currently in progress. Reset on turn.started/thread.started. */
currentTurn: CodexTurnAccumulator;
};

function appendEngineLog(path: string | undefined, chunk: string): void {
Expand Down Expand Up @@ -250,39 +264,87 @@ function logText(context: CodexLineContext, text: string): void {
context.input.progressReporter.onText(text);
}

function trackUsage(context: CodexLineContext, responseLine: string, usage: UsageSummary): void {
context.cost = usage.costUsd ?? context.cost;
/**
* Merge new usage data into the current turn accumulator.
* Intermediate events (e.g. response.completed) may carry usage before turn.completed
* fires. We accumulate here rather than persisting immediately to avoid duplicate rows.
* The last non-null value wins for each field, matching the pattern where response.completed
* carries per-response totals and turn.completed carries aggregate turn totals.
*/
function accumulateTurnUsage(context: CodexLineContext, usage: UsageSummary): void {
const acc = context.currentTurn;
if (!acc.usage) {
acc.usage = { ...usage };
} else {
// Override with new values where present — turn.completed totals supersede response.completed
if (usage.inputTokens !== undefined) acc.usage.inputTokens = usage.inputTokens;
if (usage.outputTokens !== undefined) acc.usage.outputTokens = usage.outputTokens;
if (usage.cachedTokens !== undefined) acc.usage.cachedTokens = usage.cachedTokens;
if (usage.costUsd !== undefined) acc.usage.costUsd = usage.costUsd;
}
}

/**
* Persist exactly one storeLlmCall row for the completed turn, then reset the accumulator.
* Called only from turn.completed to guarantee one row per turn, never from intermediate events.
*/
function persistTurnLlmCall(context: CodexLineContext): void {
const acc = context.currentTurn;
const usage = acc.usage;
if (usage) {
context.cost = usage.costUsd ?? context.cost;
}
context.llmCallCount += 1;

// Build a compact turn-scoped payload: text summary + tool names + usage.
// Storing this instead of the raw event JSONL keeps the payload small and readable.
const turnPayload = JSON.stringify({
turn: context.llmCallCount,
text: acc.textSummary.join(' ').slice(0, 500) || undefined,
tools: acc.toolNames.length > 0 ? acc.toolNames : undefined,
usage: usage ?? undefined,
});

logLlmCall({
runId: context.input.runId,
callNumber: context.llmCallCount,
model: context.model,
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
cachedTokens: usage.cachedTokens,
costUsd: usage.costUsd,
response: responseLine,
inputTokens: usage?.inputTokens,
outputTokens: usage?.outputTokens,
cachedTokens: usage?.cachedTokens,
costUsd: usage?.costUsd,
response: turnPayload,
engineLabel: 'Codex',
});

// Reset the accumulator for the next turn
context.currentTurn = { textSummary: [], toolNames: [], usage: null };
}

/**
* Handles structural turn/thread/item lifecycle events.
* Returns true if the event was fully handled and no further processing is needed.
*
* Persistence boundary: ONE storeLlmCall row is written exactly when turn.completed fires,
* using data accumulated across all events in the turn. Intermediate usage-bearing events
* (e.g. response.completed) update the accumulator only; they do NOT persist a row.
*/
async function handleStructuralEvent(
context: CodexLineContext,
responseLine: string,
parsed: JsonRecord,
eventType: string,
): Promise<boolean> {
if (eventType === 'turn.completed') {
await trackIteration(context);
// Merge any usage attached to turn.completed into the accumulator, then persist.
const usage = extractUsage(parsed);
if (usage) trackUsage(context, responseLine, usage);
if (usage) accumulateTurnUsage(context, usage);
persistTurnLlmCall(context);
return true;
}
if (eventType === 'turn.started' || eventType === 'thread.started') {
// Reset turn accumulator at the start of each new turn
context.currentTurn = { textSummary: [], toolNames: [], usage: null };
return true;
}
if (eventType === 'item.started') {
Expand All @@ -294,14 +356,10 @@ async function handleStructuralEvent(
return false;
}

async function handleParsedLine(
context: CodexLineContext,
responseLine: string,
parsed: JsonRecord,
): Promise<void> {
async function handleParsedLine(context: CodexLineContext, parsed: JsonRecord): Promise<void> {
const eventType = typeof parsed.type === 'string' ? parsed.type : '';

if (await handleStructuralEvent(context, responseLine, parsed, eventType)) return;
if (await handleStructuralEvent(context, parsed, eventType)) return;

const { textParts, toolCall, usage, error } = parseCodexEvent(parsed);

Expand All @@ -311,6 +369,8 @@ async function handleParsedLine(

for (const text of textParts) {
logText(context, text);
// Accumulate text into the turn buffer for compact per-call payload
context.currentTurn.textSummary.push(text.slice(0, 200));
}

if (toolCall) {
Expand All @@ -319,11 +379,15 @@ async function handleParsedLine(
input: toolCall.input,
});
context.input.progressReporter.onToolCall(toolCall.name, toolCall.input);
// Track tool name in turn buffer for the compact payload
context.currentTurn.toolNames.push(toolCall.name);
}

if (usage) {
context.input.logWriter('DEBUG', 'Codex usage', { usage });
trackUsage(context, responseLine, usage);
// Accumulate usage into the turn buffer; do NOT persist here.
// Persistence happens exactly once on turn.completed to avoid duplicate rows.
accumulateTurnUsage(context, usage);
}

if (error) {
Expand Down Expand Up @@ -354,7 +418,7 @@ async function processStdoutLine(context: CodexLineContext, line: string): Promi
return;
}

await handleParsedLine(context, line, parsed);
await handleParsedLine(context, parsed);
}

function resolveCodexModel(cascadeModel: string): string {
Expand Down Expand Up @@ -614,6 +678,7 @@ export class CodexEngine implements AgentEngine {
llmCallCount,
cost,
finalError,
currentTurn: { textSummary: [], toolNames: [], usage: null },
};

child.once('error', (error) => {
Expand Down
Loading
Loading