diff --git a/workers/src/index.ts b/workers/src/index.ts index bf7476b..c247301 100644 --- a/workers/src/index.ts +++ b/workers/src/index.ts @@ -978,7 +978,15 @@ export default { // response. The helper handles clone failures safely. if (telemetryClone) { const durationMs = Date.now() - startTime; - const cacheTier = tracer.indexSource; + // NOTE: Do NOT read tracer.indexSource here. The MCP handler returns + // a streaming Response — `await handler(...)` resolves with the + // Response object before the tool handler closure has finished + // running, so the tracer has not yet recorded the `index` span at + // this point. Reading here yields "none" for every tool. The tracer + // is only fully populated once the response body has been consumed + // (which forces the streaming tool handler to complete). The read + // therefore happens inside the waitUntil callback below, after + // `await responseClone.text()` resolves. // Clone the response synchronously before returning so the body is // still available to read inside the deferred waitUntil callback. const responseClone = response.clone(); @@ -997,6 +1005,13 @@ export default { } catch { // Fall through with empty string; bytes_out / tokens_out will be 0. } + // Read tracer.indexSource AFTER the response body has been + // consumed. By this point the streaming tool handler has + // completed and any "index" / "index-build" spans have been + // recorded. Reading earlier (e.g. immediately after `await + // handler()` returned) was the streaming-race bug that caused + // every tool call to record cache_tier="none" in production. + const cacheTier = tracer.indexSource; const shape = await measurePayloadShape(requestText, responseText); recordTelemetry(request, requestText, env, durationMs, cacheTier, shape); } catch { diff --git a/workers/test/telemetry-integration.test.mjs b/workers/test/telemetry-integration.test.mjs index b79e50c..284fcbc 100644 --- a/workers/test/telemetry-integration.test.mjs +++ b/workers/test/telemetry-integration.test.mjs @@ -49,6 +49,7 @@ const tsconfig = { include: [ join(WORKERS_ROOT, "src", "tokenize.ts"), join(WORKERS_ROOT, "src", "telemetry.ts"), + join(WORKERS_ROOT, "src", "tracing.ts"), join(WORKERS_ROOT, "src", "zip-baseline-fetcher.ts"), ], }; @@ -75,8 +76,9 @@ const compile = spawnSync("npx", ["--yes", "tsc", "-p", tsconfigPath], { // actually need weren't emitted. const tokenizeJs = join(tmp, "build", "tokenize.js"); const telemetryJs = join(tmp, "build", "telemetry.js"); +const tracingJs = join(tmp, "build", "tracing.js"); const zipFetcherJs = join(tmp, "build", "zip-baseline-fetcher.js"); -if (!existsSync(tokenizeJs) || !existsSync(telemetryJs) || !existsSync(zipFetcherJs)) { +if (!existsSync(tokenizeJs) || !existsSync(telemetryJs) || !existsSync(tracingJs) || !existsSync(zipFetcherJs)) { console.error("TypeScript compile failed (target files not emitted):"); console.error(compile.stdout); console.error(compile.stderr); @@ -110,6 +112,7 @@ for (const f of rds(buildDir).filter(n => n.endsWith(".js"))) { const { measurePayloadShape } = await import(tokenizeJs); const { recordTelemetry } = await import(telemetryJs); +const { RequestTracer } = await import(tracingJs); // ─── Mock env with writeDataPoint capture ────────────────────────────────── @@ -536,5 +539,83 @@ await test("missing env.ODDKIT_TELEMETRY is a graceful no-op", async () => { recordTelemetry(mockRequest(), requestBody, env, 5, "memory", shape); }); +// ─── Test 7: Streaming-race regression — cacheTier must be read AFTER body ── + +await test("cache_tier reads must happen after the streaming response body completes", async () => { + // The MCP handler from agents/mcp returns a streaming Response. `await + // handler(...)` resolves with the Response object before the tool handler + // closure has finished populating the tracer. Reading `tracer.indexSource` + // immediately after the await yields "none" for every tool because the + // "index" / "index-build" span has not been recorded yet. The fix in + // workers/src/index.ts moves the read inside the waitUntil callback, + // after the response body has been consumed (which forces the streaming + // tool handler to complete). + // + // This test simulates the timing pattern. A tracer is created, then a + // deferred task adds the "index" span asynchronously (mimicking the tool + // handler running while the response body streams). We assert: + // (a) the OLD pattern (read immediately) returns "none" — this is the + // observable bug the fix exists to prevent + // (b) the FIXED pattern (read after a microtask flush) returns the + // actual span source — this proves the fix recovers the value + + const tracer = new RequestTracer(); + + // Schedule the "index" span for the next tick — this models a streaming + // tool handler that has not yet recorded its index access at the moment + // the outer handler's `await` resolves. + const handlerDone = new Promise((resolve) => { + setImmediate(() => { + tracer.addSpan("index", 12, "cache"); + resolve(); + }); + }); + + // (a) OLD pattern: read tracer.indexSource synchronously, before the + // deferred span has been added. This reproduces the production bug. + const oldPatternRead = tracer.indexSource; + assert.equal( + oldPatternRead, + "none", + "OLD pattern (read immediately after await) returns 'none' — this is the streaming-race bug", + ); + + // Wait for the deferred span to land (modeling `await responseClone.text()` + // forcing the streaming tool handler to finish). + await handlerDone; + + // (b) FIXED pattern: read tracer.indexSource AFTER the deferred work has + // completed. The tracer now reflects the actual cache tier. + const fixedPatternRead = tracer.indexSource; + assert.equal( + fixedPatternRead, + "cache", + "FIXED pattern (read after body consumption) returns the actual span source", + ); + + // Round-trip: feed the fixed value through recordTelemetry and verify it + // lands in blob9. + const env = mockEnv(); + const requestBody = JSON.stringify({ jsonrpc: "2.0", id: 1, method: "tools/call", params: { name: "oddkit_search", arguments: { input: "test" } } }); + const responseBody = JSON.stringify({ jsonrpc: "2.0", id: 1, result: { content: [{ type: "text", text: "ok" }] } }); + const shape = await measurePayloadShape(requestBody, responseBody); + recordTelemetry(mockRequest(), requestBody, env, 42, fixedPatternRead, shape); + assert.equal(env.ODDKIT_TELEMETRY.writes.length, 1, "exactly one data point written"); + assert.equal( + env.ODDKIT_TELEMETRY.writes[0].blobs[8], + "cache", + "blob9 (cache_tier) carries the post-body-consumption tracer value", + ); + + // Sanity: if we had used the broken old-pattern read, blob9 would be "none" + const env2 = mockEnv(); + recordTelemetry(mockRequest(), requestBody, env2, 42, oldPatternRead, shape); + assert.equal( + env2.ODDKIT_TELEMETRY.writes[0].blobs[8], + "none", + "blob9 with the OLD-pattern read would be 'none' — what production has been recording", + ); +}); + console.log(`\n${pass} passed, ${fail} failed`); process.exit(fail > 0 ? 1 : 0);