Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions codex-rs/core/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@
"fast_mode": {
"type": "boolean"
},
"guardian_approval": {
"type": "boolean"
},
"image_detail_original": {
"type": "boolean"
},
Expand Down Expand Up @@ -476,9 +479,6 @@
"skill_mcp_dependency_install": {
"type": "boolean"
},
"smart_approvals": {
"type": "boolean"
},
"sqlite": {
"type": "boolean"
},
Expand Down Expand Up @@ -1950,6 +1950,9 @@
"fast_mode": {
"type": "boolean"
},
"guardian_approval": {
"type": "boolean"
},
"image_detail_original": {
"type": "boolean"
},
Expand Down Expand Up @@ -2028,9 +2031,6 @@
"skill_mcp_dependency_install": {
"type": "boolean"
},
"smart_approvals": {
"type": "boolean"
},
"sqlite": {
"type": "boolean"
},
Expand Down
11 changes: 9 additions & 2 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::git_info::get_git_repo_root;
use crate::guardian::GuardianReviewSessionManager;
use crate::instructions::UserInstructions;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::McpManager;
Expand Down Expand Up @@ -472,7 +473,7 @@ impl Codex {

let user_instructions = get_user_instructions(&config).await;

let exec_policy = if crate::guardian::is_guardian_subagent_source(&session_source) {
let exec_policy = if crate::guardian::is_guardian_reviewer_source(&session_source) {
// Guardian review should rely on the built-in shell safety checks,
// not on caller-provided exec-policy rules that could shape the
// reviewer or silently auto-approve commands.
Expand Down Expand Up @@ -747,6 +748,7 @@ pub(crate) struct Session {
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices,
js_repl: Arc<JsReplHandle>,
next_internal_sub_id: AtomicU64,
Expand Down Expand Up @@ -1809,6 +1811,7 @@ impl Session {
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
guardian_review_session: GuardianReviewSessionManager::default(),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
Expand Down Expand Up @@ -3440,7 +3443,7 @@ impl Session {
.into_text(),
);
let separate_guardian_developer_message =
crate::guardian::is_guardian_subagent_source(&session_source);
crate::guardian::is_guardian_reviewer_source(&session_source);
// Keep the guardian policy prompt out of the aggregated developer bundle so it
// stays isolated as its own top-level developer message for guardian subagents.
if !separate_guardian_developer_message
Expand Down Expand Up @@ -4347,6 +4350,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
break;
}
}
// Also drain cached guardian state if the submission loop exits because
// the channel closed without receiving an explicit shutdown op.
sess.guardian_review_session.shutdown().await;
debug!("Agent loop exited");
}

Expand Down Expand Up @@ -5156,6 +5162,7 @@ mod handlers {
.unified_exec_manager
.terminate_all_processes()
.await;
sess.guardian_review_session.shutdown().await;
info!("Shutting down Codex instance");
let history = sess.clone_history().await;
let turn_count = history
Expand Down
116 changes: 116 additions & 0 deletions codex-rs/core/src/codex_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
Expand Down Expand Up @@ -2873,6 +2874,120 @@ async fn shutdown_and_wait_waits_when_shutdown_is_already_in_progress() {
.expect("shutdown waiter");
}

#[tokio::test]
async fn shutdown_and_wait_shuts_down_cached_guardian_subagent() {
let (parent_session, parent_turn_context) = make_session_and_context().await;
let parent_session = Arc::new(parent_session);
let parent_config = Arc::clone(&parent_turn_context.config);
let (parent_tx_sub, parent_rx_sub) = async_channel::bounded(4);
let (_parent_tx_event, parent_rx_event) = async_channel::unbounded();
let (_parent_status_tx, parent_agent_status) = watch::channel(AgentStatus::PendingInit);
let parent_session_for_loop = Arc::clone(&parent_session);
let parent_session_loop_handle = tokio::spawn(async move {
submission_loop(parent_session_for_loop, parent_config, parent_rx_sub).await;
});
let parent_codex = Codex {
tx_sub: parent_tx_sub,
rx_event: parent_rx_event,
agent_status: parent_agent_status,
session: Arc::clone(&parent_session),
session_loop_termination: session_loop_termination_from_handle(parent_session_loop_handle),
};

let (child_session, _child_turn_context) = make_session_and_context().await;
let (child_tx_sub, child_rx_sub) = async_channel::bounded(4);
let (_child_tx_event, child_rx_event) = async_channel::unbounded();
let (_child_status_tx, child_agent_status) = watch::channel(AgentStatus::PendingInit);
let (child_shutdown_tx, child_shutdown_rx) = tokio::sync::oneshot::channel();
let child_session_loop_handle = tokio::spawn(async move {
let shutdown: Submission = child_rx_sub
.recv()
.await
.expect("child shutdown submission");
assert_eq!(shutdown.op, Op::Shutdown);
child_shutdown_tx
.send(())
.expect("child shutdown signal should be delivered");
});
let child_codex = Codex {
tx_sub: child_tx_sub,
rx_event: child_rx_event,
agent_status: child_agent_status,
session: Arc::new(child_session),
session_loop_termination: session_loop_termination_from_handle(child_session_loop_handle),
};
parent_session
.guardian_review_session
.cache_for_test(child_codex)
.await;

parent_codex
.shutdown_and_wait()
.await
.expect("parent shutdown should succeed");

child_shutdown_rx
.await
.expect("guardian subagent should receive a shutdown op");
}

#[tokio::test]
async fn shutdown_and_wait_shuts_down_tracked_ephemeral_guardian_review() {
let (parent_session, parent_turn_context) = make_session_and_context().await;
let parent_session = Arc::new(parent_session);
let parent_config = Arc::clone(&parent_turn_context.config);
let (parent_tx_sub, parent_rx_sub) = async_channel::bounded(4);
let (_parent_tx_event, parent_rx_event) = async_channel::unbounded();
let (_parent_status_tx, parent_agent_status) = watch::channel(AgentStatus::PendingInit);
let parent_session_for_loop = Arc::clone(&parent_session);
let parent_session_loop_handle = tokio::spawn(async move {
submission_loop(parent_session_for_loop, parent_config, parent_rx_sub).await;
});
let parent_codex = Codex {
tx_sub: parent_tx_sub,
rx_event: parent_rx_event,
agent_status: parent_agent_status,
session: Arc::clone(&parent_session),
session_loop_termination: session_loop_termination_from_handle(parent_session_loop_handle),
};

let (child_session, _child_turn_context) = make_session_and_context().await;
let (child_tx_sub, child_rx_sub) = async_channel::bounded(4);
let (_child_tx_event, child_rx_event) = async_channel::unbounded();
let (_child_status_tx, child_agent_status) = watch::channel(AgentStatus::PendingInit);
let (child_shutdown_tx, child_shutdown_rx) = tokio::sync::oneshot::channel();
let child_session_loop_handle = tokio::spawn(async move {
let shutdown: Submission = child_rx_sub
.recv()
.await
.expect("child shutdown submission");
assert_eq!(shutdown.op, Op::Shutdown);
child_shutdown_tx
.send(())
.expect("child shutdown signal should be delivered");
});
let child_codex = Codex {
tx_sub: child_tx_sub,
rx_event: child_rx_event,
agent_status: child_agent_status,
session: Arc::new(child_session),
session_loop_termination: session_loop_termination_from_handle(child_session_loop_handle),
};
parent_session
.guardian_review_session
.register_ephemeral_for_test(child_codex)
.await;

parent_codex
.shutdown_and_wait()
.await
.expect("parent shutdown should succeed");

child_shutdown_rx
.await
.expect("ephemeral guardian review should receive a shutdown op");
}

pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
dynamic_tools: Vec<DynamicToolSpec>,
) -> (
Expand Down Expand Up @@ -3045,6 +3160,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
Expand Down
67 changes: 65 additions & 2 deletions codex-rs/core/src/codex_tests_guardian.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::*;
use crate::compact::InitialContextInjection;
use crate::config_loader::ConfigLayerEntry;
use crate::config_loader::ConfigRequirements;
use crate::config_loader::ConfigRequirementsToml;
use crate::exec::ExecParams;
use crate::exec_policy::ExecPolicyManager;
use crate::features::Feature;
use crate::guardian::GUARDIAN_SUBAGENT_NAME;
use crate::guardian::GUARDIAN_REVIEWER_NAME;
use crate::protocol::AskForApproval;
use crate::sandboxing::SandboxPermissions;
use crate::tools::context::FunctionToolOutput;
Expand All @@ -14,8 +15,10 @@ use codex_app_server_protocol::ConfigLayerSource;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::RuleMatch;
use codex_protocol::models::ContentItem;
use codex_protocol::models::NetworkPermissions;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::function_call_output_content_items_to_text;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
Expand Down Expand Up @@ -231,6 +234,66 @@ async fn guardian_allows_unified_exec_additional_permissions_requests_past_polic
);
}

#[tokio::test]
async fn process_compacted_history_preserves_separate_guardian_developer_message() {
let (session, mut turn_context) = make_session_and_context().await;
let guardian_policy = crate::guardian::guardian_policy_prompt();
let guardian_source =
SessionSource::SubAgent(SubAgentSource::Other(GUARDIAN_REVIEWER_NAME.to_string()));

{
let mut state = session.state.lock().await;
state.session_configuration.session_source = guardian_source.clone();
}
turn_context.session_source = guardian_source;
turn_context.developer_instructions = Some(guardian_policy.clone());

let refreshed = crate::compact_remote::process_compacted_history(
&session,
&turn_context,
vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "stale developer message".to_string(),
}],
end_turn: None,
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "summary".to_string(),
}],
end_turn: None,
phase: None,
},
],
InitialContextInjection::BeforeLastUserMessage,
)
.await;

let developer_messages = refreshed
.iter()
.filter_map(|item| match item {
ResponseItem::Message { role, content, .. } if role == "developer" => {
crate::content_items_to_text(content)
}
_ => None,
})
.collect::<Vec<_>>();

assert!(
!developer_messages
.iter()
.any(|message| message.contains("stale developer message"))
);
assert!(developer_messages.len() >= 2);
assert_eq!(developer_messages.last(), Some(&guardian_policy));
}

#[tokio::test]
#[cfg(unix)]
async fn shell_handler_allows_sticky_turn_permissions_without_inline_request_permissions_feature() {
Expand Down Expand Up @@ -382,7 +445,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
file_watcher,
conversation_history: InitialHistory::New,
session_source: SessionSource::SubAgent(SubAgentSource::Other(
GUARDIAN_SUBAGENT_NAME.to_string(),
GUARDIAN_REVIEWER_NAME.to_string(),
)),
agent_control: AgentControl::default(),
dynamic_tools: Vec::new(),
Expand Down
Loading
Loading