diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index a6044bba1813..094e631f20f8 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2715,6 +2715,23 @@ ], "type": "object" }, + "ThreadInjectItemsParams": { + "properties": { + "items": { + "description": "Raw Responses API items to append to the thread's model-visible history.", + "items": true, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "items", + "threadId" + ], + "type": "object" + }, "ThreadListParams": { "properties": { "archived": { @@ -3959,6 +3976,31 @@ "title": "Thread/readRequest", "type": "object" }, + { + "description": "Append raw Responses API items to the thread history without starting a user turn.", + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/inject_items" + ], + "title": "Thread/injectItemsRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadInjectItemsParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/injectItemsRequest", + "type": "object" + }, { "properties": { "id": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 1f2072070154..45038abf088b 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -578,6 +578,31 @@ "title": "Thread/readRequest", "type": "object" }, + { + "description": "Append raw Responses API items to the thread history without starting a user turn.", + "properties": { + "id": { + "$ref": "#/definitions/v2/RequestId" + }, + "method": { + "enum": [ + "thread/inject_items" + ], + "title": "Thread/injectItemsRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadInjectItemsParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/injectItemsRequest", + "type": "object" + }, { "properties": { "id": { @@ -12862,6 +12887,30 @@ "ThreadId": { "type": "string" }, + "ThreadInjectItemsParams": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "items": { + "description": "Raw Responses API items to append to the thread's model-visible history.", + "items": true, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "items", + "threadId" + ], + "title": "ThreadInjectItemsParams", + "type": "object" + }, + "ThreadInjectItemsResponse": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ThreadInjectItemsResponse", + "type": "object" + }, "ThreadItem": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index d4d76de9f0ed..a2f40d9827a0 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -1160,6 +1160,31 @@ "title": "Thread/readRequest", "type": "object" }, + { + "description": "Append raw Responses API items to the thread history without starting a user turn.", + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/inject_items" + ], + "title": "Thread/injectItemsRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadInjectItemsParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/injectItemsRequest", + "type": "object" + }, { "properties": { "id": { @@ -10710,6 +10735,30 @@ "ThreadId": { "type": "string" }, + "ThreadInjectItemsParams": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "items": { + "description": "Raw Responses API items to append to the thread's model-visible history.", + "items": true, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "items", + "threadId" + ], + "title": "ThreadInjectItemsParams", + "type": "object" + }, + "ThreadInjectItemsResponse": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ThreadInjectItemsResponse", + "type": "object" + }, "ThreadItem": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsParams.json new file mode 100644 index 000000000000..d117f3ae0e68 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsParams.json @@ -0,0 +1,19 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "items": { + "description": "Raw Responses API items to append to the thread's model-visible history.", + "items": true, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "items", + "threadId" + ], + "title": "ThreadInjectItemsParams", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsResponse.json new file mode 100644 index 000000000000..2ba62b221495 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadInjectItemsResponse.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ThreadInjectItemsResponse", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts index 1bbc9b7ac945..0eedda3e1ec0 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts @@ -47,6 +47,7 @@ import type { SkillsListParams } from "./v2/SkillsListParams"; import type { ThreadArchiveParams } from "./v2/ThreadArchiveParams"; import type { ThreadCompactStartParams } from "./v2/ThreadCompactStartParams"; import type { ThreadForkParams } from "./v2/ThreadForkParams"; +import type { ThreadInjectItemsParams } from "./v2/ThreadInjectItemsParams"; import type { ThreadListParams } from "./v2/ThreadListParams"; import type { ThreadLoadedListParams } from "./v2/ThreadLoadedListParams"; import type { ThreadMetadataUpdateParams } from "./v2/ThreadMetadataUpdateParams"; @@ -66,4 +67,4 @@ import type { WindowsSandboxSetupStartParams } from "./v2/WindowsSandboxSetupSta /** * Request from the client to the server. */ -export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/unsubscribe", id: RequestId, params: ThreadUnsubscribeParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/metadata/update", id: RequestId, params: ThreadMetadataUpdateParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/shellCommand", id: RequestId, params: ThreadShellCommandParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "plugin/list", id: RequestId, params: PluginListParams, } | { "method": "plugin/read", id: RequestId, params: PluginReadParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "fs/readFile", id: RequestId, params: FsReadFileParams, } | { "method": "fs/writeFile", id: RequestId, params: FsWriteFileParams, } | { "method": "fs/createDirectory", id: RequestId, params: FsCreateDirectoryParams, } | { "method": "fs/getMetadata", id: RequestId, params: FsGetMetadataParams, } | { "method": "fs/readDirectory", id: RequestId, params: FsReadDirectoryParams, } | { "method": "fs/remove", id: RequestId, params: FsRemoveParams, } | { "method": "fs/copy", id: RequestId, params: FsCopyParams, } | { "method": "fs/watch", id: RequestId, params: FsWatchParams, } | { "method": "fs/unwatch", id: RequestId, params: FsUnwatchParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "plugin/install", id: RequestId, params: PluginInstallParams, } | { "method": "plugin/uninstall", id: RequestId, params: PluginUninstallParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "experimentalFeature/enablement/set", id: RequestId, params: ExperimentalFeatureEnablementSetParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "mcpServer/resource/read", id: RequestId, params: McpResourceReadParams, } | { "method": "mcpServer/tool/call", id: RequestId, params: McpServerToolCallParams, } | { "method": "windowsSandbox/setupStart", id: RequestId, params: WindowsSandboxSetupStartParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "command/exec/write", id: RequestId, params: CommandExecWriteParams, } | { "method": "command/exec/terminate", id: RequestId, params: CommandExecTerminateParams, } | { "method": "command/exec/resize", id: RequestId, params: CommandExecResizeParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "externalAgentConfig/detect", id: RequestId, params: ExternalAgentConfigDetectParams, } | { "method": "externalAgentConfig/import", id: RequestId, params: ExternalAgentConfigImportParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, }; +export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/unsubscribe", id: RequestId, params: ThreadUnsubscribeParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/metadata/update", id: RequestId, params: ThreadMetadataUpdateParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/shellCommand", id: RequestId, params: ThreadShellCommandParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "thread/inject_items", id: RequestId, params: ThreadInjectItemsParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "plugin/list", id: RequestId, params: PluginListParams, } | { "method": "plugin/read", id: RequestId, params: PluginReadParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "fs/readFile", id: RequestId, params: FsReadFileParams, } | { "method": "fs/writeFile", id: RequestId, params: FsWriteFileParams, } | { "method": "fs/createDirectory", id: RequestId, params: FsCreateDirectoryParams, } | { "method": "fs/getMetadata", id: RequestId, params: FsGetMetadataParams, } | { "method": "fs/readDirectory", id: RequestId, params: FsReadDirectoryParams, } | { "method": "fs/remove", id: RequestId, params: FsRemoveParams, } | { "method": "fs/copy", id: RequestId, params: FsCopyParams, } | { "method": "fs/watch", id: RequestId, params: FsWatchParams, } | { "method": "fs/unwatch", id: RequestId, params: FsUnwatchParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "plugin/install", id: RequestId, params: PluginInstallParams, } | { "method": "plugin/uninstall", id: RequestId, params: PluginUninstallParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "experimentalFeature/enablement/set", id: RequestId, params: ExperimentalFeatureEnablementSetParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "mcpServer/resource/read", id: RequestId, params: McpResourceReadParams, } | { "method": "mcpServer/tool/call", id: RequestId, params: McpServerToolCallParams, } | { "method": "windowsSandbox/setupStart", id: RequestId, params: WindowsSandboxSetupStartParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "command/exec/write", id: RequestId, params: CommandExecWriteParams, } | { "method": "command/exec/terminate", id: RequestId, params: CommandExecTerminateParams, } | { "method": "command/exec/resize", id: RequestId, params: CommandExecResizeParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "externalAgentConfig/detect", id: RequestId, params: ExternalAgentConfigDetectParams, } | { "method": "externalAgentConfig/import", id: RequestId, params: ExternalAgentConfigImportParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsParams.ts new file mode 100644 index 000000000000..4a49224a3973 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsParams.ts @@ -0,0 +1,10 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { JsonValue } from "../serde_json/JsonValue"; + +export type ThreadInjectItemsParams = { threadId: string, +/** + * Raw Responses API items to append to the thread's model-visible history. + */ +items: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsResponse.ts new file mode 100644 index 000000000000..60dcf0d0b3d8 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadInjectItemsResponse.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadInjectItemsResponse = Record; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index f815fee3e9d3..961592db391b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -282,6 +282,8 @@ export type { ThreadCompactStartParams } from "./ThreadCompactStartParams"; export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse"; export type { ThreadForkParams } from "./ThreadForkParams"; export type { ThreadForkResponse } from "./ThreadForkResponse"; +export type { ThreadInjectItemsParams } from "./ThreadInjectItemsParams"; +export type { ThreadInjectItemsResponse } from "./ThreadInjectItemsResponse"; export type { ThreadItem } from "./ThreadItem"; export type { ThreadListParams } from "./ThreadListParams"; export type { ThreadListResponse } from "./ThreadListResponse"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index f26f8366f092..7334a964ee56 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -322,6 +322,11 @@ client_request_definitions! { params: v2::ThreadReadParams, response: v2::ThreadReadResponse, }, + /// Append raw Responses API items to the thread history without starting a user turn. + ThreadInjectItems => "thread/inject_items" { + params: v2::ThreadInjectItemsParams, + response: v2::ThreadInjectItemsResponse, + }, SkillsList => "skills/list" { params: v2::SkillsListParams, response: v2::SkillsListResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 8a6a6e57b30b..1e42fae12f20 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -4251,6 +4251,20 @@ pub struct TurnStartResponse { pub turn: Turn, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadInjectItemsParams { + pub thread_id: String, + /// Raw Responses API items to append to the thread's model-visible history. + pub items: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadInjectItemsResponse {} + #[derive( Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS, ExperimentalApi, )] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index b0a16616aa2a..7083c791169f 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -151,6 +151,7 @@ Example with notification opt-out: - `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted. - `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success. - `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode". +- `thread/inject_items` — append raw Responses API items to a loaded thread’s model-visible history without starting a user turn; returns `{}` on success. - `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`. - `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`. - `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`. @@ -581,6 +582,24 @@ Invoke a plugin by including a UI mention token such as `@sample` in the text in } } } ``` +### Example: Inject raw history items + +Use `thread/inject_items` to append prebuilt Responses API items to a loaded thread’s prompt history without starting a user turn. These items are persisted to the rollout and included in subsequent model requests. + +```json +{ "method": "thread/inject_items", "id": 36, "params": { + "threadId": "thr_123", + "items": [ + { + "type": "message", + "role": "assistant", + "content": [{ "type": "output_text", "text": "Previously computed context." }] + } + ] +} } +{ "id": 36, "result": {} } +``` + ### Example: Start realtime with WebRTC Use `thread/realtime/start` with `transport.type: "webrtc"` when a browser or webview owns the `RTCPeerConnection` and app-server should create the server-side realtime session. The transport `sdp` must be the offer SDP produced by `RTCPeerConnection.createOffer()`, not a hand-written or minimal SDP string. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3decc83f4a23..d0b835280c55 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -132,6 +132,8 @@ use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadIncrementElicitationParams; use codex_app_server_protocol::ThreadIncrementElicitationResponse; +use codex_app_server_protocol::ThreadInjectItemsParams; +use codex_app_server_protocol::ThreadInjectItemsResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; @@ -958,6 +960,10 @@ impl CodexMessageProcessor { ) .await; } + ClientRequest::ThreadInjectItems { request_id, params } => { + self.thread_inject_items(to_connection_request_id(request_id), params) + .await; + } ClientRequest::TurnSteer { request_id, params } => { self.turn_steer(to_connection_request_id(request_id), params) .await; @@ -6986,6 +6992,55 @@ impl CodexMessageProcessor { } } + async fn thread_inject_items( + &self, + request_id: ConnectionRequestId, + params: ThreadInjectItemsParams, + ) { + let (_, thread) = match self.load_thread(¶ms.thread_id).await { + Ok(value) => value, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let items = match params + .items + .into_iter() + .enumerate() + .map(|(index, value)| { + serde_json::from_value::(value) + .map_err(|err| format!("items[{index}] is not a valid response item: {err}")) + }) + .collect::, _>>() + { + Ok(items) => items, + Err(message) => { + self.send_invalid_request_error(request_id, message).await; + return; + } + }; + + match thread.inject_response_items(items).await { + Ok(()) => { + self.outgoing + .send_response(request_id, ThreadInjectItemsResponse {}) + .await; + } + Err(CodexErr::InvalidRequest(message)) => { + self.send_invalid_request_error(request_id, message).await; + } + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to inject response items: {err}"), + ) + .await; + } + } + } + async fn set_app_server_client_info( thread: &CodexThread, app_server_client_name: Option, diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 5ef7273defdb..51e3d48d9d6e 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -62,6 +62,7 @@ use codex_app_server_protocol::SkillsListParams; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadForkParams; +use codex_app_server_protocol::ThreadInjectItemsParams; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadLoadedListParams; use codex_app_server_protocol::ThreadMemoryModeSetParams; @@ -602,6 +603,15 @@ impl McpProcess { self.send_request("turn/start", params).await } + /// Send a `thread/inject_items` JSON-RPC request (v2). + pub async fn send_thread_inject_items_request( + &mut self, + params: ThreadInjectItemsParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/inject_items", params).await + } + /// Send a `command/exec` JSON-RPC request (v2). pub async fn send_command_exec_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 56c4fea90575..617c05f5778f 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -35,6 +35,7 @@ mod safety_check_downgrade; mod skills_list; mod thread_archive; mod thread_fork; +mod thread_inject_items; mod thread_list; mod thread_loaded_list; mod thread_memory_mode_set; diff --git a/codex-rs/app-server/tests/suite/v2/thread_inject_items.rs b/codex-rs/app-server/tests/suite/v2/thread_inject_items.rs new file mode 100644 index 000000000000..56fd188c4b2c --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_inject_items.rs @@ -0,0 +1,288 @@ +use anyhow::Context; +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadInjectItemsParams; +use codex_app_server_protocol::ThreadInjectItemsResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::RolloutRecorder; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::RolloutItem; +use core_test_support::responses; +use serde_json::Value; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_inject_items_adds_raw_response_items_to_thread_history() -> Result<()> { + let server = responses::start_mock_server().await; + let body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]); + let response_mock = responses::mount_sse_once(&server, body).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let injected_text = "Injected assistant context"; + let injected_item = ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: injected_text.to_string(), + }], + end_turn: None, + phase: None, + }; + + let inject_req = mcp + .send_thread_inject_items_request(ThreadInjectItemsParams { + thread_id: thread.id.clone(), + items: vec![serde_json::to_value(&injected_item)?], + }) + .await?; + let inject_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(inject_req)), + ) + .await??; + let _response: ThreadInjectItemsResponse = + to_response::(inject_resp)?; + + let rollout_path = thread.path.as_ref().context("thread path missing")?; + let history = RolloutRecorder::get_rollout_history(rollout_path).await?; + let InitialHistory::Resumed(resumed_history) = history else { + panic!("expected resumed rollout history"); + }; + assert!( + resumed_history + .history + .iter() + .any(|item| matches!(item, RolloutItem::ResponseItem(response_item) if response_item == &injected_item)), + "injected item should be persisted in rollout history" + ); + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let injected_value = serde_json::to_value(&injected_item)?; + let model_input = response_mock.single_request().input(); + let environment_context_index = + response_item_text_position(&model_input, "") + .expect("environment context should be injected before the first user turn"); + let injected_index = model_input + .iter() + .position(|item| item == &injected_value) + .expect("injected item should be sent in the next model request"); + let user_prompt_index = response_item_text_position(&model_input, "Hello") + .expect("user prompt should be sent in the next model request"); + assert!( + environment_context_index < injected_index, + "standard initial context should be sent before injected items" + ); + assert!( + injected_index < user_prompt_index, + "injected items should be sent before the user prompt" + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_inject_items_adds_raw_response_items_after_a_turn() -> Result<()> { + let server = responses::start_mock_server().await; + let first_body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "First done"), + responses::ev_completed("resp-1"), + ]); + let second_body = responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "Second done"), + responses::ev_completed("resp-2"), + ]); + let response_mock = responses::mount_sse_sequence(&server, vec![first_body, second_body]).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let first_turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "First turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_turn_req)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let injected_item = ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "Injected after first turn".to_string(), + }], + end_turn: None, + phase: None, + }; + let injected_value = serde_json::to_value(&injected_item)?; + + let inject_req = mcp + .send_thread_inject_items_request(ThreadInjectItemsParams { + thread_id: thread.id.clone(), + items: vec![injected_value.clone()], + }) + .await?; + let inject_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(inject_req)), + ) + .await??; + let _response: ThreadInjectItemsResponse = + to_response::(inject_resp)?; + + let second_turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Second turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_turn_req)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 2); + assert!( + !requests[0].input().contains(&injected_value), + "injected item should not be sent before it is injected" + ); + assert!( + requests[1].input().contains(&injected_value), + "injected item should be sent after being injected into existing history" + ); + + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} + +fn response_item_text_position(items: &[Value], needle: &str) -> Option { + items.iter().position(|item| { + item.get("content") + .and_then(Value::as_array) + .into_iter() + .flatten() + .any(|content| { + content + .get("text") + .and_then(Value::as_str) + .is_some_and(|text| text.contains(needle)) + }) + }) +} diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index a84db85aebfc..2a68b1e4f97a 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -199,6 +199,29 @@ impl CodexThread { Ok(submission_id) } + /// Append raw Responses API items to the thread's model-visible history. + pub async fn inject_response_items(&self, items: Vec) -> CodexResult<()> { + if items.is_empty() { + return Err(CodexErr::InvalidRequest( + "items must not be empty".to_string(), + )); + } + + let turn_context = self.codex.session.new_default_turn().await; + if self.codex.session.reference_context_item().await.is_none() { + self.codex + .session + .record_context_updates_and_set_reference_context_item(turn_context.as_ref()) + .await; + } + self.codex + .session + .record_conversation_items(turn_context.as_ref(), &items) + .await; + self.codex.session.flush_rollout().await?; + Ok(()) + } + pub fn rollout_path(&self) -> Option { self.rollout_path.clone() }