Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 71 additions & 84 deletions src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,7 @@ pub async fn execute_safe_outputs(
) -> Result<Vec<ExecutionResult>> {
let safe_output_path = safe_output_dir.join(SAFE_OUTPUT_FILENAME);

info!("Stage 3 execution starting");
debug!("Safe output directory: {}", safe_output_dir.display());
debug!("Source directory: {}", ctx.source_directory.display());
debug!(
"ADO org: {}",
ctx.ado_org_url.as_deref().unwrap_or("<not set>")
);
debug!(
"ADO project: {}",
ctx.ado_project.as_deref().unwrap_or("<not set>")
);
debug!(
"Repository ID: {}",
ctx.repository_id.as_deref().unwrap_or("<not set>")
);
debug!(
"Repository name: {}",
ctx.repository_name.as_deref().unwrap_or("<not set>")
);
if !ctx.allowed_repositories.is_empty() {
debug!(
"Allowed repositories: {}",
ctx.allowed_repositories
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
);
}
log_execution_context(safe_output_dir, ctx);

if !safe_output_path.exists() {
info!(
Expand Down Expand Up @@ -86,12 +58,7 @@ pub async fn execute_safe_outputs(
info!("Found {} safe output(s) to execute", entries.len());
println!("Found {} safe output(s) to execute", entries.len());

// Log summary of what we're about to execute
for (i, entry) in entries.iter().enumerate() {
if let Some(name) = entry.get("name").and_then(|n| n.as_str()) {
debug!("[{}/{}] Queued: {}", i + 1, entries.len(), name);
}
}
log_queued_entries(&entries);

// Build budget map: tool_name → (executed_count, max_allowed).
// Each tool declares its DEFAULT_MAX via the ToolResult trait; the operator can
Expand Down Expand Up @@ -133,63 +100,19 @@ pub async fn execute_safe_outputs(
let mut results = Vec::new();
for (i, entry) in entries.iter().enumerate() {
let entry_json = serde_json::to_string(entry).unwrap_or_else(|_| "<invalid>".to_string());
debug!(
"[{}/{}] Executing entry: {}",
i + 1,
entries.len(),
entry_json
);
debug!("[{}/{}] Executing entry: {}", i + 1, entries.len(), entry_json);

// Generic budget enforcement: skip excess entries rather than aborting the whole batch.
// Budget is consumed before execution so that failed attempts (target policy rejection,
// network errors) still count — this prevents unbounded retries against a failing endpoint.
if let Some(tool_name) = entry.get("name").and_then(|n| n.as_str()) {
if let Some((executed, max)) = budgets.get_mut(tool_name) {
let context_id = extract_entry_context(entry);
if let Some(result) = check_budget(entries.len(), i, tool_name, &context_id, *executed, *max) {
results.push(result);
continue;
}
*executed += 1;
}
if let Some(result) = enforce_budget(entry, &mut budgets, entries.len(), i) {
results.push(result);
continue;
}

match execute_safe_output(entry, ctx).await {
Ok((tool_name, result)) => {
if result.is_warning() {
warn!(
"[{}/{}] {} warning: {}",
i + 1,
entries.len(),
tool_name,
result.message
);
} else if result.success {
info!(
"[{}/{}] {} succeeded: {}",
i + 1,
entries.len(),
tool_name,
result.message
);
} else {
warn!(
"[{}/{}] {} failed: {}",
i + 1,
entries.len(),
tool_name,
result.message
);
}
let symbol = if result.is_warning() { "⚠" } else if result.success { "✓" } else { "✗" };
println!(
"[{}/{}] {} - {} - {}",
i + 1,
entries.len(),
tool_name,
symbol,
result.message
);
log_and_print_entry_result(i, entries.len(), &tool_name, &result);
results.push(result);
}
Err(e) => {
Expand All @@ -213,6 +136,70 @@ pub async fn execute_safe_outputs(
Ok(results)
}

/// Emit debug-level context about the execution environment at Stage 3 startup.
fn log_execution_context(safe_output_dir: &Path, ctx: &ExecutionContext) {
info!("Stage 3 execution starting");
debug!("Safe output directory: {}", safe_output_dir.display());
debug!("Source directory: {}", ctx.source_directory.display());
debug!("ADO org: {}", ctx.ado_org_url.as_deref().unwrap_or("<not set>"));
debug!("ADO project: {}", ctx.ado_project.as_deref().unwrap_or("<not set>"));
debug!("Repository ID: {}", ctx.repository_id.as_deref().unwrap_or("<not set>"));
debug!("Repository name: {}", ctx.repository_name.as_deref().unwrap_or("<not set>"));
if !ctx.allowed_repositories.is_empty() {
debug!(
"Allowed repositories: {}",
ctx.allowed_repositories
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
);
}
}

/// Log each queued entry at debug level before execution begins.
fn log_queued_entries(entries: &[Value]) {
for (i, entry) in entries.iter().enumerate() {
if let Some(name) = entry.get("name").and_then(|n| n.as_str()) {
debug!("[{}/{}] Queued: {}", i + 1, entries.len(), name);
}
}
}

/// Check the per-tool budget for an entry.
///
/// Returns `Some(result)` when the budget is exhausted (caller should push the result and
/// skip execution). When a slot is available the counter is incremented and `None` is
/// returned so execution can proceed.
fn enforce_budget(
entry: &Value,
budgets: &mut HashMap<&str, (usize, usize)>,
total: usize,
i: usize,
) -> Option<ExecutionResult> {
let tool_name = entry.get("name").and_then(|n| n.as_str())?;
let (executed, max) = budgets.get_mut(tool_name)?;
let context_id = extract_entry_context(entry);
if let Some(result) = check_budget(total, i, tool_name, &context_id, *executed, *max) {
return Some(result);
}
*executed += 1;
None
}

/// Log and print the outcome of a single safe-output execution.
fn log_and_print_entry_result(i: usize, total: usize, tool_name: &str, result: &ExecutionResult) {
if result.is_warning() {
warn!("[{}/{}] {} warning: {}", i + 1, total, tool_name, result.message);
} else if result.success {
info!("[{}/{}] {} succeeded: {}", i + 1, total, tool_name, result.message);
} else {
warn!("[{}/{}] {} failed: {}", i + 1, total, tool_name, result.message);
}
let symbol = if result.is_warning() { "⚠" } else if result.success { "✓" } else { "✗" };
println!("[{}/{}] {} - {} - {}", i + 1, total, tool_name, symbol, result.message);
}

/// Parse a JSON entry as `T` and run it through `execute_sanitized`.
///
/// This is the common path for all tools that implement `Executor`. The tool name
Expand Down