From c4a1ac17c457becc1e139c0c3f6c6ca3a9d7b503 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 13 Apr 2026 10:22:36 +0100 Subject: [PATCH 1/2] feat: disable memory endpoint --- .../schema/json/ClientRequest.json | 7 + .../codex_app_server_protocol.schemas.json | 7 + .../codex_app_server_protocol.v2.schemas.json | 7 + .../schema/typescript/ThreadMemoryMode.ts | 5 + .../schema/typescript/index.ts | 1 + .../src/protocol/common.rs | 5 + .../app-server-protocol/src/protocol/v2.rs | 37 +++++ codex-rs/app-server/README.md | 11 ++ .../app-server/src/codex_message_processor.rs | 127 +++++++++++++++ .../app-server/tests/common/mcp_process.rs | 10 ++ .../tests/suite/v2/experimental_api.rs | 35 +++++ codex-rs/app-server/tests/suite/v2/mod.rs | 1 + .../tests/suite/v2/thread_memory_mode_set.rs | 147 ++++++++++++++++++ codex-rs/core/src/codex.rs | 65 ++++++++ codex-rs/protocol/src/protocol.rs | 14 ++ 15 files changed, 479 insertions(+) create mode 100644 codex-rs/app-server-protocol/schema/typescript/ThreadMemoryMode.ts create mode 100644 codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 76f7abdfd239..a6044bba1813 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2809,6 +2809,13 @@ }, "type": "object" }, + "ThreadMemoryMode": { + "enum": [ + "enabled", + "disabled" + ], + "type": "string" + }, "ThreadMetadataGitInfoUpdateParams": { "properties": { "branch": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 15b31cc0ef6b..74c7499e09e6 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -13647,6 +13647,13 @@ "title": "ThreadLoadedListResponse", "type": "object" }, + "ThreadMemoryMode": { + "enum": [ + "enabled", + "disabled" + ], + "type": "string" + }, "ThreadMetadataGitInfoUpdateParams": { "properties": { "branch": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index dd053d77d22a..8540d93ed0f7 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -11495,6 +11495,13 @@ "title": "ThreadLoadedListResponse", "type": "object" }, + "ThreadMemoryMode": { + "enum": [ + "enabled", + "disabled" + ], + "type": "string" + }, "ThreadMetadataGitInfoUpdateParams": { "properties": { "branch": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ThreadMemoryMode.ts b/codex-rs/app-server-protocol/schema/typescript/ThreadMemoryMode.ts new file mode 100644 index 000000000000..74a7e759e73f --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/ThreadMemoryMode.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadMemoryMode = "enabled" | "disabled"; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 2a3520789673..3f07f7169582 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -68,6 +68,7 @@ export type { SessionSource } from "./SessionSource"; export type { Settings } from "./Settings"; export type { SubAgentSource } from "./SubAgentSource"; export type { ThreadId } from "./ThreadId"; +export type { ThreadMemoryMode } from "./ThreadMemoryMode"; export type { Tool } from "./Tool"; export type { Verbosity } from "./Verbosity"; export type { WebSearchAction } from "./WebSearchAction"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 3450e41534f5..f26f8366f092 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -284,6 +284,11 @@ client_request_definitions! { params: v2::ThreadMetadataUpdateParams, response: v2::ThreadMetadataUpdateResponse, }, + #[experimental("thread/memoryMode/set")] + ThreadMemoryModeSet => "thread/memoryMode/set" { + params: v2::ThreadMemoryModeSetParams, + response: v2::ThreadMemoryModeSetResponse, + }, ThreadUnarchive => "thread/unarchive" { params: v2::ThreadUnarchiveParams, response: v2::ThreadUnarchiveResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 30adc152eaca..0b31674733a4 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3049,6 +3049,43 @@ pub struct ThreadMetadataUpdateResponse { pub thread: Thread, } +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "lowercase")] +#[ts(rename_all = "lowercase")] +pub enum ThreadMemoryMode { + Enabled, + Disabled, +} + +impl ThreadMemoryMode { + pub fn as_str(self) -> &'static str { + match self { + Self::Enabled => "enabled", + Self::Disabled => "disabled", + } + } + + pub fn to_core(self) -> codex_protocol::protocol::ThreadMemoryMode { + match self { + Self::Enabled => codex_protocol::protocol::ThreadMemoryMode::Enabled, + Self::Disabled => codex_protocol::protocol::ThreadMemoryMode::Disabled, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadMemoryModeSetParams { + pub thread_id: String, + pub mode: ThreadMemoryMode, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadMemoryModeSetResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 94f86c7ebb1a..2796f49eabc4 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -140,6 +140,7 @@ Example with notification opt-out: - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`. +- `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success. - `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`). - `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`. - `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server shuts down and unloads the thread, then emits `thread/closed`. @@ -395,6 +396,16 @@ Use `thread/metadata/update` to patch sqlite-backed metadata for a thread withou } } ``` +Experimental: use `thread/memoryMode/set` to change whether a thread remains eligible for future memory generation. + +```json +{ "method": "thread/memoryMode/set", "id": 26, "params": { + "threadId": "thr_123", + "mode": "disabled" +} } +{ "id": 26, "result": {} } +``` + ### Example: Archive a thread Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 6762b9e12922..25b1c872597f 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -137,6 +137,8 @@ use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadLoadedListParams; use codex_app_server_protocol::ThreadLoadedListResponse; +use codex_app_server_protocol::ThreadMemoryModeSetParams; +use codex_app_server_protocol::ThreadMemoryModeSetResponse; use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadMetadataUpdateResponse; @@ -772,6 +774,10 @@ impl CodexMessageProcessor { self.thread_metadata_update(to_connection_request_id(request_id), params) .await; } + ClientRequest::ThreadMemoryModeSet { request_id, params } => { + self.thread_memory_mode_set(to_connection_request_id(request_id), params) + .await; + } ClientRequest::ThreadUnarchive { request_id, params } => { self.thread_unarchive(to_connection_request_id(request_id), params) .await; @@ -2772,6 +2778,127 @@ impl CodexMessageProcessor { .await; } + async fn thread_memory_mode_set( + &self, + request_id: ConnectionRequestId, + params: ThreadMemoryModeSetParams, + ) { + let ThreadMemoryModeSetParams { thread_id, mode } = params; + let thread_id = match ThreadId::from_string(&thread_id) { + Ok(id) => id, + Err(err) => { + self.send_invalid_request_error(request_id, format!("invalid thread id: {err}")) + .await; + return; + } + }; + + if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { + if thread.config_snapshot().await.ephemeral { + self.send_invalid_request_error( + request_id, + format!("ephemeral thread does not support memory mode updates: {thread_id}"), + ) + .await; + return; + } + + if let Err(err) = self + .submit_core_op( + &request_id, + thread.as_ref(), + Op::SetThreadMemoryMode { + mode: mode.to_core(), + }, + ) + .await + { + self.send_internal_error( + request_id, + format!("failed to set thread memory mode: {err}"), + ) + .await; + return; + } + + self.outgoing + .send_response(request_id, ThreadMemoryModeSetResponse {}) + .await; + return; + } + + let rollout_path = + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await + { + Ok(Some(path)) => Some(path), + Ok(None) => None, + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate thread id {thread_id}: {err}"), + ) + .await; + return; + } + }; + + let Some(rollout_path) = rollout_path else { + self.send_invalid_request_error(request_id, format!("thread not found: {thread_id}")) + .await; + return; + }; + + let mut session_meta = match read_session_meta_line(rollout_path.as_path()).await { + Ok(session_meta) => session_meta, + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to set thread memory mode: {err}"), + ) + .await; + return; + } + }; + if session_meta.meta.id != thread_id { + self.send_internal_error( + request_id, + format!( + "failed to set thread memory mode: rollout session metadata id mismatch: expected {thread_id}, found {}", + session_meta.meta.id + ), + ) + .await; + return; + } + session_meta.meta.memory_mode = Some(mode.as_str().to_string()); + let item = RolloutItem::SessionMeta(session_meta); + + if let Err(err) = append_rollout_item_to_path(rollout_path.as_path(), &item).await { + self.send_internal_error( + request_id, + format!("failed to set thread memory mode: {err}"), + ) + .await; + return; + } + + let state_db_ctx = open_state_db_for_direct_thread_lookup(&self.config).await; + reconcile_rollout( + state_db_ctx.as_deref(), + rollout_path.as_path(), + self.config.model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + + self.outgoing + .send_response(request_id, ThreadMemoryModeSetResponse {}) + .await; + } + async fn thread_metadata_update( &self, request_id: ConnectionRequestId, diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 76bb7bff4346..5ef7273defdb 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -64,6 +64,7 @@ use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadLoadedListParams; +use codex_app_server_protocol::ThreadMemoryModeSetParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; @@ -583,6 +584,15 @@ impl McpProcess { self.send_request("mock/experimentalMethod", params).await } + /// Send a `thread/memoryMode/set` JSON-RPC request (v2, experimental). + pub async fn send_thread_memory_mode_set_request( + &mut self, + params: ThreadMemoryModeSetParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/memoryMode/set", params).await + } + /// Send a `turn/start` JSON-RPC request (v2). pub async fn send_turn_start_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/experimental_api.rs b/codex-rs/app-server/tests/suite/v2/experimental_api.rs index 4a532aebc082..25a607390ec5 100644 --- a/codex-rs/app-server/tests/suite/v2/experimental_api.rs +++ b/codex-rs/app-server/tests/suite/v2/experimental_api.rs @@ -11,6 +11,8 @@ use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::MockExperimentalMethodParams; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadMemoryMode; +use codex_app_server_protocol::ThreadMemoryModeSetParams; use codex_app_server_protocol::ThreadRealtimeStartParams; use codex_app_server_protocol::ThreadRealtimeStartTransport; use codex_app_server_protocol::ThreadStartParams; @@ -89,6 +91,39 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R Ok(()) } +#[tokio::test] +async fn thread_memory_mode_set_requires_experimental_api_capability() -> Result<()> { + let codex_home = TempDir::new()?; + let mut mcp = McpProcess::new(codex_home.path()).await?; + + let init = mcp + .initialize_with_capabilities( + default_client_info(), + Some(InitializeCapabilities { + experimental_api: false, + opt_out_notification_methods: None, + }), + ) + .await?; + let JSONRPCMessage::Response(_) = init else { + anyhow::bail!("expected initialize response, got {init:?}"); + }; + + let request_id = mcp + .send_thread_memory_mode_set_request(ThreadMemoryModeSetParams { + thread_id: "thr_123".to_string(), + mode: ThreadMemoryMode::Disabled, + }) + .await?; + let error = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + assert_experimental_capability_error(error, "thread/memoryMode/set"); + Ok(()) +} + #[tokio::test] async fn realtime_webrtc_start_requires_experimental_api_capability() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index db82c1368f09..56c4fea90575 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -37,6 +37,7 @@ mod thread_archive; mod thread_fork; mod thread_list; mod thread_loaded_list; +mod thread_memory_mode_set; mod thread_metadata_update; mod thread_name_websocket; mod thread_read; diff --git a/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs b/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs new file mode 100644 index 000000000000..cd60875339af --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs @@ -0,0 +1,147 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_fake_rollout; +use app_test_support::create_mock_responses_server_repeating_assistant; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadMemoryMode; +use codex_app_server_protocol::ThreadMemoryModeSetParams; +use codex_app_server_protocol::ThreadMemoryModeSetResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_protocol::ThreadId; +use codex_state::StateRuntime; +use pretty_assertions::assert_eq; +use std::path::Path; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_memory_mode_set_updates_loaded_thread_state() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let state_db = init_state_db(codex_home.path()).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + let thread_uuid = ThreadId::from_string(&thread.id)?; + + let set_id = mcp + .send_thread_memory_mode_set_request(ThreadMemoryModeSetParams { + thread_id: thread.id, + mode: ThreadMemoryMode::Disabled, + }) + .await?; + let set_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(set_id)), + ) + .await??; + let _: ThreadMemoryModeSetResponse = to_response::(set_resp)?; + + let memory_mode = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let memory_mode = state_db.get_thread_memory_mode(thread_uuid).await?; + if memory_mode.as_deref() == Some("disabled") { + break Ok::<_, anyhow::Error>(memory_mode); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await??; + assert_eq!(memory_mode.as_deref(), Some("disabled")); + Ok(()) +} + +#[tokio::test] +async fn thread_memory_mode_set_updates_stored_thread_state() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let state_db = init_state_db(codex_home.path()).await?; + + let thread_id = create_fake_rollout( + codex_home.path(), + "2025-01-06T08-30-00", + "2025-01-06T08:30:00Z", + "Stored thread preview", + Some("mock_provider"), + /*git_info*/ None, + )?; + let thread_uuid = ThreadId::from_string(&thread_id)?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + for mode in [ThreadMemoryMode::Disabled, ThreadMemoryMode::Enabled] { + let set_id = mcp + .send_thread_memory_mode_set_request(ThreadMemoryModeSetParams { + thread_id: thread_id.clone(), + mode, + }) + .await?; + let set_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(set_id)), + ) + .await??; + let _: ThreadMemoryModeSetResponse = to_response::(set_resp)?; + } + + let memory_mode = state_db.get_thread_memory_mode(thread_uuid).await?; + assert_eq!(memory_mode.as_deref(), Some("enabled")); + Ok(()) +} + +async fn init_state_db(codex_home: &Path) -> Result> { + let state_db = StateRuntime::init(codex_home.to_path_buf(), "mock_provider".into()).await?; + state_db + .mark_backfill_complete(/*last_watermark*/ None) + .await?; + Ok(state_db) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" +suppress_unstable_features_warning = true + +[features] +sqlite = true + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a3a674cb60b1..162e4716be08 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4806,6 +4806,10 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv handlers::set_thread_name(&sess, sub.id.clone(), name).await; false } + Op::SetThreadMemoryMode { mode } => { + handlers::set_thread_memory_mode(&sess, sub.id.clone(), mode).await; + false + } Op::RunUserShellCommand { command } => { handlers::run_user_shell_command(&sess, sub.id.clone(), command).await; false @@ -4895,6 +4899,7 @@ mod handlers { use crate::review_prompts::resolve_review_request; use crate::rollout::RolloutRecorder; + use crate::rollout::read_session_meta_line; use crate::tasks::CompactTask; use crate::tasks::UndoTask; use crate::tasks::UserShellCommandMode; @@ -4916,6 +4921,7 @@ mod handlers { use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SkillsListEntry; + use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadNameUpdatedEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortReason; @@ -5653,6 +5659,43 @@ mod handlers { Ok(msg) } + async fn persist_thread_memory_mode_update( + sess: &Arc, + mode: ThreadMemoryMode, + ) -> anyhow::Result<()> { + let recorder = { + let guard = sess.services.rollout.lock().await; + guard.clone() + } + .ok_or_else(|| { + anyhow::anyhow!("Session persistence is disabled; cannot update thread memory mode.") + })?; + recorder.persist().await?; + recorder.flush().await?; + + let rollout_path = recorder.rollout_path().to_path_buf(); + let mut session_meta = read_session_meta_line(rollout_path.as_path()).await?; + if session_meta.meta.id != sess.conversation_id { + anyhow::bail!( + "rollout session metadata id mismatch: expected {}, found {}", + sess.conversation_id, + session_meta.meta.id + ); + } + session_meta.meta.memory_mode = Some( + match mode { + ThreadMemoryMode::Enabled => "enabled", + ThreadMemoryMode::Disabled => "disabled", + } + .to_string(), + ); + + let item = RolloutItem::SessionMeta(session_meta); + recorder.record_items(std::slice::from_ref(&item)).await?; + recorder.flush().await?; + Ok(()) + } + /// Persists the thread name in the rollout and state database, updates in-memory state, and /// emits a `ThreadNameUpdated` event on success. pub async fn set_thread_name(sess: &Arc, sub_id: String, name: String) { @@ -5712,6 +5755,28 @@ mod handlers { sess.deliver_event_raw(Event { id: sub_id, msg }).await; } + /// Persists thread-level memory mode metadata for the active session. + /// + /// This does not involve the model and only affects whether the thread is + /// eligible for future memory generation. + pub async fn set_thread_memory_mode( + sess: &Arc, + sub_id: String, + mode: ThreadMemoryMode, + ) { + if let Err(err) = persist_thread_memory_mode_update(sess, mode).await { + warn!("Failed to persist thread memory mode update to rollout: {err}"); + let event = Event { + id: sub_id, + msg: EventMsg::Error(ErrorEvent { + message: err.to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + }), + }; + sess.send_event_raw(event).await; + } + } + pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; let _ = sess.conversation.shutdown().await; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 65d6dab04ce7..9aa481b400e4 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -636,6 +636,12 @@ pub enum Op { /// involve the model. SetThreadName { name: String }, + /// Set whether the thread remains eligible for memory generation. + /// + /// This persists thread-level memory mode metadata without involving the + /// model. + SetThreadMemoryMode { mode: ThreadMemoryMode }, + /// Request Codex to undo a turn (turn are stacked so it is the same effect as CMD + Z). Undo, @@ -665,6 +671,13 @@ pub enum Op { ListModels, } +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, JsonSchema)] +#[serde(rename_all = "lowercase")] +pub enum ThreadMemoryMode { + Enabled, + Disabled, +} + impl From> for Op { fn from(value: Vec) -> Self { Op::UserInput { @@ -755,6 +768,7 @@ impl Op { Self::DropMemories => "drop_memories", Self::UpdateMemories => "update_memories", Self::SetThreadName { .. } => "set_thread_name", + Self::SetThreadMemoryMode { .. } => "set_thread_memory_mode", Self::Undo => "undo", Self::ThreadRollback { .. } => "thread_rollback", Self::Review { .. } => "review", From d9179e6a8449cebbf93f13a79d56ca74308ffe18 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 13 Apr 2026 10:33:12 +0100 Subject: [PATCH 2/2] nit --- codex-rs/app-server/src/codex_message_processor.rs | 11 +---------- .../tests/suite/v2/thread_memory_mode_set.rs | 11 +---------- codex-rs/core/src/codex.rs | 13 ++++++++++++- codex-rs/core/src/codex_thread.rs | 6 ++++++ 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 25b1c872597f..c3e06e42d8c5 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2803,16 +2803,7 @@ impl CodexMessageProcessor { return; } - if let Err(err) = self - .submit_core_op( - &request_id, - thread.as_ref(), - Op::SetThreadMemoryMode { - mode: mode.to_core(), - }, - ) - .await - { + if let Err(err) = thread.set_thread_memory_mode(mode.to_core()).await { self.send_internal_error( request_id, format!("failed to set thread memory mode: {err}"), diff --git a/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs b/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs index cd60875339af..bf9bba7b2ff9 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_memory_mode_set.rs @@ -57,16 +57,7 @@ async fn thread_memory_mode_set_updates_loaded_thread_state() -> Result<()> { .await??; let _: ThreadMemoryModeSetResponse = to_response::(set_resp)?; - let memory_mode = timeout(DEFAULT_READ_TIMEOUT, async { - loop { - let memory_mode = state_db.get_thread_memory_mode(thread_uuid).await?; - if memory_mode.as_deref() == Some("disabled") { - break Ok::<_, anyhow::Error>(memory_mode); - } - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } - }) - .await??; + let memory_mode = state_db.get_thread_memory_mode(thread_uuid).await?; assert_eq!(memory_mode.as_deref(), Some("disabled")); Ok(()) } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 162e4716be08..032c2a12aefa 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -742,6 +742,17 @@ impl Codex { Ok(()) } + /// Persist a thread-level memory mode update for the active session. + /// + /// This is a local-only operation that updates rollout metadata directly + /// and does not involve the model. + pub async fn set_thread_memory_mode( + &self, + mode: codex_protocol::protocol::ThreadMemoryMode, + ) -> anyhow::Result<()> { + handlers::persist_thread_memory_mode_update(&self.session, mode).await + } + pub async fn shutdown_and_wait(&self) -> CodexResult<()> { let session_loop_termination = self.session_loop_termination.clone(); match self.submit(Op::Shutdown).await { @@ -5659,7 +5670,7 @@ mod handlers { Ok(msg) } - async fn persist_thread_memory_mode_update( + pub(super) async fn persist_thread_memory_mode_update( sess: &Arc, mode: ThreadMemoryMode, ) -> anyhow::Result<()> { diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 86decbe6b22b..a84db85aebfc 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -20,6 +20,7 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; +use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; @@ -95,6 +96,11 @@ impl CodexThread { self.codex.submit_with_trace(op, trace).await } + /// Persist whether this thread is eligible for future memory generation. + pub async fn set_thread_memory_mode(&self, mode: ThreadMemoryMode) -> anyhow::Result<()> { + self.codex.set_thread_memory_mode(mode).await + } + pub async fn steer_input( &self, input: Vec,