From e683d6161c1e34668ebf1b419cc83e04ef835a8d Mon Sep 17 00:00:00 2001 From: Ratul Sarna Date: Sun, 8 Mar 2026 02:28:55 +0530 Subject: [PATCH 1/2] Preserve Codex subagent thread started metadata --- .../Layers/ProviderRuntimeIngestion.test.ts | 20 ++ .../src/provider/Layers/CodexAdapter.test.ts | 145 +++++++++++++ .../src/provider/Layers/CodexAdapter.ts | 198 +++++++++++++++++- .../contracts/src/providerRuntime.test.ts | 111 ++++++++++ packages/contracts/src/providerRuntime.ts | 28 +++ 5 files changed, 496 insertions(+), 6 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 96242b846c..e0d5460415 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -368,6 +368,16 @@ describe("ProviderRuntimeIngestion", () => { provider: "codex", createdAt: new Date().toISOString(), threadId: asThreadId("thread-1"), + payload: { + providerThreadId: "provider-thread-midturn", + source: { + kind: "subAgentThreadSpawn", + parentProviderThreadId: "provider-parent-midturn", + depth: 1, + agentNickname: "Atlas", + agentRole: "explorer", + }, + }, }); harness.emit({ type: "session.started", @@ -1108,6 +1118,16 @@ describe("ProviderRuntimeIngestion", () => { provider: "codex", createdAt: now, threadId: asThreadId("thread-1"), + payload: { + providerThreadId: "provider-thread-1", + source: { + kind: "subAgentThreadSpawn", + parentProviderThreadId: "provider-parent-1", + depth: 1, + agentNickname: "Atlas", + agentRole: "explorer", + }, + }, }); harness.emit({ type: "item.started", diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index c977f5dadc..f6ec4d19e3 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -407,6 +407,151 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { }), ); + it.effect("maps thread/started with spawned subagent metadata into canonical thread.started events", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-thread-started-subagent"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: new Date().toISOString(), + method: "thread/started", + payload: { + thread: { + id: "provider-thread-1", + name: null, + preview: "", + status: { + type: "active", + activeFlags: [], + }, + source: { + subagent: { + thread_spawn: { + parent_thread_id: "provider-parent-1", + depth: 2, + agent_nickname: null, + agent_role: "explorer", + }, + }, + }, + agentNickname: "Atlas", + agentRole: "explorer", + }, + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "thread.started"); + if (firstEvent.value.type !== "thread.started") { + return; + } + assert.deepEqual(firstEvent.value.payload, { + providerThreadId: "provider-thread-1", + name: null, + preview: "", + status: { + type: "active", + activeFlags: [], + }, + source: { + kind: "subAgentThreadSpawn", + parentProviderThreadId: "provider-parent-1", + depth: 2, + agentNickname: "Atlas", + agentRole: "explorer", + }, + }); + }), + ); + + it.effect("falls back to top-level threadId when mapping thread/started events", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-thread-started-fallback"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: new Date().toISOString(), + method: "thread/started", + payload: { + threadId: "provider-thread-2", + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "thread.started"); + if (firstEvent.value.type !== "thread.started") { + return; + } + assert.deepEqual(firstEvent.value.payload, { + providerThreadId: "provider-thread-2", + }); + }), + ); + + it.effect("ignores thread/started notifications without a recoverable provider thread id", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-thread-started-invalid"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: new Date().toISOString(), + method: "thread/started", + payload: { + thread: { + name: "Missing id", + }, + }, + } satisfies ProviderEvent); + lifecycleManager.emit("event", { + id: asEventId("evt-thread-started-after-invalid"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: new Date().toISOString(), + method: "thread/started", + payload: { + threadId: "provider-thread-after-invalid", + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "thread.started"); + if (firstEvent.value.type !== "thread.started") { + return; + } + assert.deepEqual(firstEvent.value.payload, { + providerThreadId: "provider-thread-after-invalid", + }); + }), + ); + it.effect("preserves request type when mapping serverRequest/resolved", () => Effect.gen(function* () { const adapter = yield* CodexAdapter; diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 9a6271b792..e67dbae2c0 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -11,6 +11,7 @@ import { type CanonicalRequestType, type ProviderEvent, type ProviderRuntimeEvent, + type ThreadStartedPayload, type ProviderUserInputAnswers, RuntimeItemId, RuntimeRequestId, @@ -109,6 +110,194 @@ function asNumber(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) ? value : undefined; } +function asNonNegativeInt(value: unknown): number | undefined { + return typeof value === "number" && Number.isInteger(value) && value >= 0 ? value : undefined; +} + +function normalizeNonEmptyString(value: string | undefined): string | undefined { + const normalized = value?.trim(); + return normalized && normalized.length > 0 ? normalized : undefined; +} + +function readNullableNonEmptyString(value: unknown): string | null | undefined { + if (value === null) { + return null; + } + return normalizeNonEmptyString(asString(value)); +} + +function readNullableString(value: unknown): string | null | undefined { + if (value === null) { + return null; + } + return asString(value); +} + +function withThreadStartedAgentMetadata( + source: Omit, + agentNickname: string | null | undefined, + agentRole: string | null | undefined, +): ThreadStartedSource { + return { + ...source, + ...(agentNickname !== undefined ? { agentNickname } : {}), + ...(agentRole !== undefined ? { agentRole } : {}), + }; +} + +function coalesceNullableString( + primary: string | null | undefined, + fallback: string | null | undefined, +): string | null | undefined { + if (primary !== undefined && primary !== null) { + return primary; + } + if (fallback !== undefined) { + return fallback; + } + return primary; +} + +type ThreadStartedSource = NonNullable; + +function normalizeThreadStartedSource( + sourceValue: unknown, + agentNicknameValue: unknown, + agentRoleValue: unknown, +): ThreadStartedSource | undefined { + const topLevelAgentNickname = readNullableNonEmptyString(agentNicknameValue); + const topLevelAgentRole = readNullableNonEmptyString(agentRoleValue); + + const source = asObject(sourceValue); + if (!source) { + if (topLevelAgentNickname !== undefined || topLevelAgentRole !== undefined) { + return withThreadStartedAgentMetadata( + { kind: "unknown" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + const sourceKind = normalizeNonEmptyString(asString(sourceValue)); + if (!sourceKind) { + return undefined; + } + switch (sourceKind) { + case "cli": + case "vscode": + case "exec": + case "unknown": + return withThreadStartedAgentMetadata( + { kind: sourceKind }, + topLevelAgentNickname, + topLevelAgentRole, + ); + case "mcp": + return withThreadStartedAgentMetadata( + { kind: "appServer" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + default: + return withThreadStartedAgentMetadata( + { kind: "unknown" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + } + + const subagent = source.subagent; + const subagentValue = normalizeNonEmptyString(asString(subagent)); + if (subagentValue) { + switch (subagentValue) { + case "review": + return withThreadStartedAgentMetadata( + { kind: "subAgentReview" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + case "compact": + return withThreadStartedAgentMetadata( + { kind: "subAgentCompact" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + default: + return withThreadStartedAgentMetadata( + { kind: "subAgentOther", otherKind: subagentValue }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + } + + const subagentRecord = asObject(subagent); + if (!subagentRecord) { + return withThreadStartedAgentMetadata( + { kind: "unknown" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + + const threadSpawn = asObject(subagentRecord.thread_spawn); + if (threadSpawn) { + const parentProviderThreadId = normalizeNonEmptyString(asString(threadSpawn.parent_thread_id)); + const depth = asNonNegativeInt(threadSpawn.depth); + const agentNickname = coalesceNullableString( + readNullableNonEmptyString(threadSpawn.agent_nickname), + topLevelAgentNickname, + ); + const agentRole = coalesceNullableString( + readNullableNonEmptyString(threadSpawn.agent_role), + topLevelAgentRole, + ); + return { + kind: "subAgentThreadSpawn", + ...(parentProviderThreadId ? { parentProviderThreadId } : {}), + ...(depth !== undefined ? { depth } : {}), + ...(agentNickname !== undefined ? { agentNickname } : {}), + ...(agentRole !== undefined ? { agentRole } : {}), + }; + } + + const otherKind = normalizeNonEmptyString(asString(subagentRecord.other)); + if (otherKind) { + return withThreadStartedAgentMetadata( + { kind: "subAgentOther", otherKind }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + + return withThreadStartedAgentMetadata( + { kind: "unknown" }, + topLevelAgentNickname, + topLevelAgentRole, + ); +} + +function readThreadStartedPayload(payload: Record | undefined): ThreadStartedPayload | undefined { + const thread = asObject(payload?.thread); + const providerThreadId = + normalizeNonEmptyString(asString(thread?.id)) ?? normalizeNonEmptyString(asString(payload?.threadId)); + if (!providerThreadId) { + return undefined; + } + + const name = readNullableNonEmptyString(thread?.name); + const preview = readNullableString(thread?.preview); + const source = normalizeThreadStartedSource(thread?.source, thread?.agentNickname, thread?.agentRole); + + return { + providerThreadId, + ...(name !== undefined ? { name } : {}), + ...(preview !== undefined ? { preview } : {}), + ...(thread && Object.prototype.hasOwnProperty.call(thread, "status") ? { status: thread.status } : {}), + ...(source ? { source } : {}), + }; +} + function toTurnStatus(value: unknown): "completed" | "failed" | "cancelled" | "interrupted" { switch (value) { case "completed": @@ -642,18 +831,15 @@ function mapToRuntimeEvents( } if (event.method === "thread/started") { - const payloadThreadId = asString(asObject(payload?.thread)?.id); - const providerThreadId = payloadThreadId ?? asString(payload?.threadId); - if (!providerThreadId) { + const threadStartedPayload = readThreadStartedPayload(payload); + if (!threadStartedPayload) { return []; } return [ { ...runtimeEventBase(event, canonicalThreadId), type: "thread.started", - payload: { - providerThreadId, - }, + payload: threadStartedPayload, }, ]; } diff --git a/packages/contracts/src/providerRuntime.test.ts b/packages/contracts/src/providerRuntime.test.ts index 8dad509aee..e17abd50be 100644 --- a/packages/contracts/src/providerRuntime.test.ts +++ b/packages/contracts/src/providerRuntime.test.ts @@ -113,6 +113,98 @@ describe("ProviderRuntimeEvent", () => { expect(parsed.payload.answers.sandbox_mode).toBe("workspace-write"); }); + it("decodes thread.started with spawned subagent metadata", () => { + const parsed = decodeRuntimeEvent({ + type: "thread.started", + eventId: "event-thread-started-1", + provider: "codex", + createdAt: "2026-02-28T00:00:04.000Z", + threadId: "thread-1", + payload: { + providerThreadId: "provider-thread-1", + name: null, + preview: "", + status: { + type: "active", + activeFlags: [], + }, + source: { + kind: "subAgentThreadSpawn", + parentProviderThreadId: "provider-parent-1", + depth: 1, + agentNickname: "Atlas", + agentRole: "explorer", + }, + }, + }); + + expect(parsed.type).toBe("thread.started"); + if (parsed.type !== "thread.started") { + throw new Error("expected thread.started"); + } + expect(parsed.payload.providerThreadId).toBe("provider-thread-1"); + expect(parsed.payload.name).toBeNull(); + expect(parsed.payload.preview).toBe(""); + expect(parsed.payload.source).toEqual({ + kind: "subAgentThreadSpawn", + parentProviderThreadId: "provider-parent-1", + depth: 1, + agentNickname: "Atlas", + agentRole: "explorer", + }); + }); + + it("decodes thread.started with minimal legacy payload", () => { + const parsed = decodeRuntimeEvent({ + type: "thread.started", + eventId: "event-thread-started-2", + provider: "codex", + createdAt: "2026-02-28T00:00:05.000Z", + threadId: "thread-1", + payload: { + providerThreadId: "provider-thread-2", + }, + }); + + expect(parsed.type).toBe("thread.started"); + if (parsed.type !== "thread.started") { + throw new Error("expected thread.started"); + } + expect(parsed.payload).toEqual({ + providerThreadId: "provider-thread-2", + }); + }); + + it("decodes thread.started with non-thread-spawn source variants", () => { + const parsed = decodeRuntimeEvent({ + type: "thread.started", + eventId: "event-thread-started-3", + provider: "codex", + createdAt: "2026-02-28T00:00:06.000Z", + threadId: "thread-1", + payload: { + providerThreadId: "provider-thread-3", + source: { + kind: "subAgentOther", + agentNickname: null, + agentRole: null, + otherKind: "memory_consolidation", + }, + }, + }); + + expect(parsed.type).toBe("thread.started"); + if (parsed.type !== "thread.started") { + throw new Error("expected thread.started"); + } + expect(parsed.payload.source).toEqual({ + kind: "subAgentOther", + agentNickname: null, + agentRole: null, + otherKind: "memory_consolidation", + }); + }); + it("rejects legacy message.delta type", () => { expect(() => decodeRuntimeEvent({ @@ -139,4 +231,23 @@ describe("ProviderRuntimeEvent", () => { }), ).toThrow(); }); + + it("rejects thread.started with negative source depth", () => { + expect(() => + decodeRuntimeEvent({ + type: "thread.started", + eventId: "event-thread-started-4", + provider: "codex", + createdAt: "2026-02-28T00:00:07.000Z", + threadId: "thread-1", + payload: { + providerThreadId: "provider-thread-4", + source: { + kind: "subAgentThreadSpawn", + depth: -1, + }, + }, + }), + ).toThrow(); + }); }); diff --git a/packages/contracts/src/providerRuntime.ts b/packages/contracts/src/providerRuntime.ts index 903bb5da78..70d71836fc 100644 --- a/packages/contracts/src/providerRuntime.ts +++ b/packages/contracts/src/providerRuntime.ts @@ -2,6 +2,7 @@ import { Schema } from "effect"; import { EventId, IsoDateTime, + NonNegativeInt, ProviderItemId, RuntimeItemId, RuntimeRequestId, @@ -268,8 +269,35 @@ const SessionExitedPayload = Schema.Struct({ }); export type SessionExitedPayload = typeof SessionExitedPayload.Type; +const ThreadStartedSourceKind = Schema.Literals([ + "cli", + "vscode", + "exec", + "appServer", + "subAgentReview", + "subAgentCompact", + "subAgentThreadSpawn", + "subAgentOther", + "unknown", +]); +export type ThreadStartedSourceKind = typeof ThreadStartedSourceKind.Type; + +const ThreadStartedSource = Schema.Struct({ + kind: ThreadStartedSourceKind, + parentProviderThreadId: Schema.optional(TrimmedNonEmptyStringSchema), + depth: Schema.optional(NonNegativeInt), + agentNickname: Schema.optional(Schema.NullOr(TrimmedNonEmptyStringSchema)), + agentRole: Schema.optional(Schema.NullOr(TrimmedNonEmptyStringSchema)), + otherKind: Schema.optional(TrimmedNonEmptyStringSchema), +}); +export type ThreadStartedSource = typeof ThreadStartedSource.Type; + const ThreadStartedPayload = Schema.Struct({ providerThreadId: Schema.optional(TrimmedNonEmptyStringSchema), + name: Schema.optional(Schema.NullOr(TrimmedNonEmptyStringSchema)), + preview: Schema.optional(Schema.NullOr(Schema.String)), + status: Schema.optional(Schema.Unknown), + source: Schema.optional(ThreadStartedSource), }); export type ThreadStartedPayload = typeof ThreadStartedPayload.Type; From 7bf804c8150c89364cea5eea004316a4c66e2cbf Mon Sep 17 00:00:00 2001 From: Ratul Sarna Date: Sun, 8 Mar 2026 02:32:14 +0530 Subject: [PATCH 2/2] Fix thread source normalization with null agent metadata --- .../src/provider/Layers/CodexAdapter.test.ts | 43 ++++++++++++++ .../src/provider/Layers/CodexAdapter.ts | 56 ++++++++++--------- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index f6ec4d19e3..d5730de643 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -506,6 +506,49 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { }), ); + it.effect("preserves string-based thread source kinds when top-level agent metadata is explicitly null", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-thread-started-string-source-null-agent-metadata"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + createdAt: new Date().toISOString(), + method: "thread/started", + payload: { + thread: { + id: "provider-thread-3", + source: "mcp", + agentNickname: null, + agentRole: null, + }, + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "thread.started"); + if (firstEvent.value.type !== "thread.started") { + return; + } + assert.deepEqual(firstEvent.value.payload, { + providerThreadId: "provider-thread-3", + source: { + kind: "appServer", + agentNickname: null, + agentRole: null, + }, + }); + }), + ); + it.effect("ignores thread/started notifications without a recoverable provider thread id", () => Effect.gen(function* () { const adapter = yield* CodexAdapter; diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index e67dbae2c0..3a08ae8613 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -170,6 +170,33 @@ function normalizeThreadStartedSource( const source = asObject(sourceValue); if (!source) { + const sourceKind = normalizeNonEmptyString(asString(sourceValue)); + if (sourceKind) { + switch (sourceKind) { + case "cli": + case "vscode": + case "exec": + case "unknown": + return withThreadStartedAgentMetadata( + { kind: sourceKind }, + topLevelAgentNickname, + topLevelAgentRole, + ); + case "mcp": + return withThreadStartedAgentMetadata( + { kind: "appServer" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + default: + return withThreadStartedAgentMetadata( + { kind: "unknown" }, + topLevelAgentNickname, + topLevelAgentRole, + ); + } + } + if (topLevelAgentNickname !== undefined || topLevelAgentRole !== undefined) { return withThreadStartedAgentMetadata( { kind: "unknown" }, @@ -177,33 +204,8 @@ function normalizeThreadStartedSource( topLevelAgentRole, ); } - const sourceKind = normalizeNonEmptyString(asString(sourceValue)); - if (!sourceKind) { - return undefined; - } - switch (sourceKind) { - case "cli": - case "vscode": - case "exec": - case "unknown": - return withThreadStartedAgentMetadata( - { kind: sourceKind }, - topLevelAgentNickname, - topLevelAgentRole, - ); - case "mcp": - return withThreadStartedAgentMetadata( - { kind: "appServer" }, - topLevelAgentNickname, - topLevelAgentRole, - ); - default: - return withThreadStartedAgentMetadata( - { kind: "unknown" }, - topLevelAgentNickname, - topLevelAgentRole, - ); - } + + return undefined; } const subagent = source.subagent;