Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/server/integration/TestProviderAdapter.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,15 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
sessions.clear();
});

const compactThread: ProviderAdapterShape<ProviderAdapterError>["compactThread"] = (threadId) =>
readThread(threadId).pipe(
Effect.map((snapshot) => {
const latestTurn = snapshot.turns.at(-1);
const latestItem = latestTurn?.items.at(-1);
return typeof latestItem === "string" ? latestItem : null;
}),
);

const adapter: ProviderAdapterShape<ProviderAdapterError> = {
provider,
capabilities: {
Expand All @@ -486,6 +495,7 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
listSessions,
hasSession,
readThread,
compactThread,
rollbackThread,
stopAll,
streamEvents: Stream.fromQueue(runtimeEvents),
Expand Down
55 changes: 55 additions & 0 deletions apps/server/src/codexAppServerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export interface CodexThreadSnapshot {
}

const CODEX_VERSION_CHECK_TIMEOUT_MS = 4_000;
const CODEX_THREAD_COMPACTION_TIMEOUT_MS = 60_000;

const ANSI_ESCAPE_CHAR = String.fromCharCode(27);
const ANSI_ESCAPE_REGEX = new RegExp(`${ANSI_ESCAPE_CHAR}\\[[0-9;]*m`, "g");
Expand Down Expand Up @@ -795,6 +796,60 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
return this.parseThreadSnapshot("thread/read", response);
}

async compactThread(threadId: ThreadId): Promise<CodexThreadSnapshot> {
const context = this.requireSession(threadId);
const providerThreadId = readResumeThreadId({
threadId: context.session.threadId,
runtimeMode: context.session.runtimeMode,
resumeCursor: context.session.resumeCursor,
});
if (!providerThreadId) {
throw new Error("Session is missing a provider resume thread id.");
}

const compactionWait = {
cleanup: () => undefined,
};
const waitForCompaction = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
compactionWait.cleanup();
reject(new Error("Timed out waiting for Codex thread compaction."));
}, CODEX_THREAD_COMPACTION_TIMEOUT_MS);

const onEvent = (event: ProviderEvent) => {
if (
event.provider !== "codex" ||
event.threadId !== context.session.threadId ||
event.kind !== "notification"
) {
return;
}
if (event.method === "thread/compacted") {
compactionWait.cleanup();
resolve();
}
};

compactionWait.cleanup = () => {
clearTimeout(timeout);
this.off("event", onEvent);
};

this.on("event", onEvent);
});

try {
await this.sendRequest(context, "thread/compact/start", {
threadId: providerThreadId,
});
await waitForCompaction;
} catch (error) {
compactionWait.cleanup();
throw error;
}
return this.readThread(threadId);
}

async rollbackThread(threadId: ThreadId, numTurns: number): Promise<CodexThreadSnapshot> {
const context = this.requireSession(threadId);
const providerThreadId = readResumeThreadId({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ function createProviderServiceHarness(
respondToRequest: () => unsupported(),
respondToUserInput: () => unsupported(),
stopSession: () => unsupported(),
stopSessionForProvider: () => unsupported(),
listSessions,
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
compactThread: () => unsupported(),
rollbackConversation,
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
Expand Down
173 changes: 128 additions & 45 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ describe("ProviderCommandReactor", () => {
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
let nextSessionIndex = 1;
const runtimeSessions: Array<ProviderSession> = [];
const modelSelection = input?.threadModelSelection ?? {
const defaultModelSelection = input?.threadModelSelection ?? {
provider: "codex",
model: "gpt-5-codex",
};
Expand All @@ -123,8 +123,25 @@ describe("ProviderCommandReactor", () => {
typeof input.threadId === "string"
? ThreadId.make(input.threadId)
: ThreadId.make(`thread-${sessionIndex}`);
const provider =
typeof input === "object" &&
input !== null &&
"provider" in input &&
(input.provider === "codex" || input.provider === "claudeAgent")
? input.provider
: defaultModelSelection.provider;
const model =
typeof input === "object" &&
input !== null &&
"modelSelection" in input &&
typeof input.modelSelection === "object" &&
input.modelSelection !== null &&
"model" in input.modelSelection &&
typeof input.modelSelection.model === "string"
? input.modelSelection.model
: defaultModelSelection.model;
const session: ProviderSession = {
provider: modelSelection.provider,
provider,
status: "ready" as const,
runtimeMode:
typeof input === "object" &&
Expand All @@ -133,13 +150,21 @@ describe("ProviderCommandReactor", () => {
(input.runtimeMode === "approval-required" || input.runtimeMode === "full-access")
? input.runtimeMode
: "full-access",
...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}),
...(model ? { model } : {}),
threadId,
resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` },
createdAt: now,
updatedAt: now,
};
runtimeSessions.push(session);
const existingIndex = runtimeSessions.findIndex(
(runtimeSession) =>
runtimeSession.threadId === threadId && runtimeSession.provider === provider,
);
if (existingIndex >= 0) {
runtimeSessions.splice(existingIndex, 1, session);
} else {
runtimeSessions.push(session);
}
return Effect.succeed(session);
});
const sendTurn = vi.fn((_: unknown) =>
Expand All @@ -151,6 +176,13 @@ describe("ProviderCommandReactor", () => {
const interruptTurn = vi.fn((_: unknown) => Effect.void);
const respondToRequest = vi.fn<ProviderServiceShape["respondToRequest"]>(() => Effect.void);
const respondToUserInput = vi.fn<ProviderServiceShape["respondToUserInput"]>(() => Effect.void);
const compactThread = vi.fn((input: unknown) =>
Effect.succeed(
typeof input === "object" && input !== null && "threadId" in input
? `Summary for ${String(input.threadId)}`
: "Summary",
),
);
const stopSession = vi.fn((input: unknown) =>
Effect.sync(() => {
const threadId =
Expand All @@ -160,7 +192,18 @@ describe("ProviderCommandReactor", () => {
if (!threadId) {
return;
}
const index = runtimeSessions.findIndex((session) => session.threadId === threadId);
const provider =
typeof input === "object" &&
input !== null &&
"provider" in input &&
(input.provider === "codex" || input.provider === "claudeAgent")
? input.provider
: undefined;
const index = runtimeSessions.findIndex(
(session) =>
session.threadId === threadId &&
(provider === undefined || session.provider === provider),
);
if (index >= 0) {
runtimeSessions.splice(index, 1);
}
Expand Down Expand Up @@ -201,7 +244,9 @@ describe("ProviderCommandReactor", () => {
interruptTurn: interruptTurn as ProviderServiceShape["interruptTurn"],
respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"],
respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"],
compactThread: compactThread as ProviderServiceShape["compactThread"],
stopSession: stopSession as ProviderServiceShape["stopSession"],
stopSessionForProvider: stopSession as ProviderServiceShape["stopSessionForProvider"],
listSessions: () => Effect.succeed(runtimeSessions),
getCapabilities: (_provider) =>
Effect.succeed({
Expand Down Expand Up @@ -250,7 +295,7 @@ describe("ProviderCommandReactor", () => {
projectId: asProjectId("project-1"),
title: "Provider Project",
workspaceRoot: "/tmp/provider-project",
defaultModelSelection: modelSelection,
defaultModelSelection: defaultModelSelection,
createdAt: now,
}),
);
Expand All @@ -261,7 +306,7 @@ describe("ProviderCommandReactor", () => {
threadId: ThreadId.make("thread-1"),
projectId: asProjectId("project-1"),
title: "Thread",
modelSelection: modelSelection,
modelSelection: defaultModelSelection,
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
Expand All @@ -277,6 +322,7 @@ describe("ProviderCommandReactor", () => {
interruptTurn,
respondToRequest,
respondToUserInput,
compactThread,
stopSession,
renameBranch,
generateBranchName,
Expand Down Expand Up @@ -716,7 +762,7 @@ describe("ProviderCommandReactor", () => {
});
});

it("rejects a first turn when requested provider conflicts with the thread model", async () => {
it("starts the requested provider on the first turn even when the thread default differs", async () => {
const harness = await createHarness({
threadModelSelection: { provider: "codex", model: "gpt-5-codex" },
});
Expand All @@ -743,29 +789,28 @@ describe("ProviderCommandReactor", () => {
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
false
);
});

expect(harness.startSession).not.toHaveBeenCalled();
expect(harness.sendTurn).not.toHaveBeenCalled();
await waitFor(() => harness.startSession.mock.calls.length === 1);
await waitFor(() => harness.sendTurn.mock.calls.length === 1);

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(thread?.session).toBeNull();
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
summary: "Provider turn start failed",
payload: {
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
expect(harness.compactThread).not.toHaveBeenCalled();
expect(harness.startSession.mock.calls[0]?.[1]).toMatchObject({
provider: "claudeAgent",
modelSelection: {
provider: "claudeAgent",
model: "claude-opus-4-6",
},
});
expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({
threadId: ThreadId.make("thread-1"),
input: "hello claude",
modelSelection: {
provider: "claudeAgent",
model: "claude-opus-4-6",
},
});
expect(thread?.session?.providerName).toBe("claudeAgent");
});

it("preserves the active session model when in-session model switching is unsupported", async () => {
Expand Down Expand Up @@ -1066,7 +1111,7 @@ describe("ProviderCommandReactor", () => {
});
});

it("rejects provider changes after a thread is already bound to a session provider", async () => {
it("compacts and hands off context when switching providers mid-thread", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

Expand Down Expand Up @@ -1111,31 +1156,69 @@ describe("ProviderCommandReactor", () => {
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
false
);
});
await waitFor(() => harness.startSession.mock.calls.length === 2);
await waitFor(() => harness.sendTurn.mock.calls.length === 2);

expect(harness.startSession.mock.calls.length).toBe(1);
expect(harness.sendTurn.mock.calls.length).toBe(1);
expect(harness.stopSession.mock.calls.length).toBe(0);
expect(harness.compactThread.mock.calls.length).toBe(1);
expect(harness.compactThread.mock.calls[0]?.[0]).toMatchObject({
threadId: ThreadId.make("thread-1"),
});
expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({
provider: "claudeAgent",
modelSelection: {
provider: "claudeAgent",
model: "claude-opus-4-6",
},
});
expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({
threadId: ThreadId.make("thread-1"),
modelSelection: {
provider: "claudeAgent",
model: "claude-opus-4-6",
},
});
const switchedTurnInput = harness.sendTurn.mock.calls[1]?.[0] as { input?: string } | undefined;
expect(switchedTurnInput?.input).toContain("<provider_handoff>");
expect(switchedTurnInput?.input).toContain("Summary for thread-1");
expect(switchedTurnInput?.input).toContain("second");
expect(harness.stopSession.mock.calls.length).toBe(1);
expect(harness.stopSession.mock.calls[0]?.[0]).toMatchObject({
threadId: ThreadId.make("thread-1"),
provider: "codex",
});

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(thread?.session?.threadId).toBe("thread-1");
expect(thread?.session?.providerName).toBe("codex");
expect(thread?.session?.providerName).toBe("claudeAgent");
expect(thread?.session?.runtimeMode).toBe("approval-required");
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
},
});
thread?.activities.filter((activity) => activity.kind.startsWith("provider.handoff.")),
).toEqual(
expect.arrayContaining([
expect.objectContaining({
kind: "provider.handoff.compacting",
summary: "Handing off thread",
turnId: null,
payload: expect.objectContaining({
sourceProvider: "codex",
targetProvider: "claudeAgent",
}),
}),
expect.objectContaining({
kind: "provider.handoff.completed",
summary: "Handed off thread",
turnId: null,
payload: expect.objectContaining({
sourceProvider: "codex",
targetProvider: "claudeAgent",
}),
}),
]),
);
expect(
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed"),
).toBe(false);
});

it("does not stop the active session when restart fails before rebind", async () => {
Expand Down
Loading
Loading