diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f11cfc2a..7b994d1b 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -393,6 +393,36 @@ The batch flag is passed through the executor chain: - Applied in both normal mode (`execute()`) and stream mode (`handle_stream_mode()`) - TUI mode maintains its own quit handling and ignores this flag +**Fail-Fast Mode (Added 2025-12):** + +The `--fail-fast` / `-k` option enables immediate termination when any node fails. This is compatible with pdsh's `-k` flag and useful for: +- Critical operations where partial execution is unacceptable +- Deployment scripts where all nodes must succeed +- Validation checks across clusters + +Implementation uses: +```rust +// Cancellation signaling via tokio::sync::watch +let (cancel_tx, cancel_rx) = watch::channel(false); + +// Task selection with cancellation check +tokio::select! { + biased; // Prioritize cancellation check + _ = cancel_rx.changed() => { + // Task cancelled due to fail-fast + return Err(anyhow!("Execution cancelled due to fail-fast")); + } + permit = semaphore.acquire() => { + // Execute task normally + } +} +``` + +The fail-fast mode integrates with: +- `--require-all-success`: Both require all nodes to succeed, but fail-fast stops early +- `--check-all-nodes`: Fail-fast stops early, check-all-nodes affects final exit code +- `--parallel N`: Cancels pending tasks waiting in the semaphore queue + ### 4. SSH Client (`ssh/client/*`, `ssh/tokio_client/*`) **SSH Client Module Structure (Refactored 2025-10-17):** diff --git a/README.md b/README.md index 3d8762d7..5f4e45cb 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ A high-performance SSH client with **SSH-compatible syntax** for both single-hos - **Port Forwarding**: Full support for local (-L), remote (-R), and dynamic (-D) SSH port forwarding - **Jump Host Support**: Connect through bastion hosts using OpenSSH ProxyJump syntax (`-J`) - **Parallel Execution**: Execute commands across multiple nodes simultaneously +- **Fail-Fast Mode**: Stop immediately on first failure with `-k` flag (pdsh compatible) - **Interactive Terminal UI (TUI)**: Real-time monitoring with 4 view modes (Summary/Detail/Split/Diff) for multi-node operations - **Cluster Management**: Define and manage node clusters via configuration files - **Progress Tracking**: Real-time progress indicators with smart detection (percentages, fractions, apt/dpkg) @@ -219,6 +220,13 @@ bssh -C production --connect-timeout 10 "uptime" # Different timeouts for connection and command bssh -C production --connect-timeout 5 --timeout 600 "long-running-job" + +# Fail-fast mode: stop immediately on any failure (pdsh -k compatible) +bssh -k -H "web1,web2,web3" "deploy.sh" +bssh --fail-fast -C production "critical-script.sh" + +# Combine fail-fast with require-all-success for critical operations +bssh -k --require-all-success -C production "service-restart.sh" ``` ### Output Modes diff --git a/docs/man/bssh.1 b/docs/man/bssh.1 index 90bc237e..84e3b38f 100644 --- a/docs/man/bssh.1 +++ b/docs/man/bssh.1 @@ -247,6 +247,25 @@ which is useful for programmatic parsing or cleaner display. Works with both stream mode (--stream) and file mode (--output-dir). Example: bssh -H host1,host2 --stream -N "uname -a" +.TP +.BR \-k ", " \-\-fail\-fast +Stop execution immediately on first failure (pdsh -k compatible). +When enabled, bssh cancels pending commands when any node fails due to +connection error or non-zero exit code. This is useful for: +.RS +.IP \[bu] 2 +Critical operations where partial execution is unacceptable +.IP \[bu] 2 +Deployment scripts where all nodes must succeed +.IP \[bu] 2 +Validation checks across clusters +.RE +.IP +Running tasks are terminated gracefully, and the error message clearly +indicates which node caused the failure. Can be combined with +.B --require-all-success +for strict error handling. + .TP .BR \-v ", " \-\-verbose Increase verbosity (can be used multiple times: -v, -vv, -vvv) @@ -1240,6 +1259,29 @@ Example output: Useful for monitoring long-running commands or when piping output. .RE +.SS Fail-Fast Mode Examples +.TP +Stop on first failure during critical deployment: +.B bssh -k -C production "deploy.sh" +.RS +Execution stops immediately if any node fails the deployment script +.RE + +.TP +Combine fail-fast with require-all-success: +.B bssh --fail-fast --require-all-success -C production "service-restart.sh" +.RS +Stops early on failure AND ensures final exit code reflects any failures +.RE + +.TP +Sequential fail-fast with limited parallelism: +.B bssh -k --parallel 1 -H "node1,node2,node3" "critical-operation" +.RS +Runs commands one at a time, stopping on first failure +.RE + +.SS File Transfer Examples .TP Upload configuration file to all nodes: .B bssh -H "node1,node2,node3" upload /etc/myapp.conf /etc/myapp.conf diff --git a/src/app/dispatcher.rs b/src/app/dispatcher.rs index 41d161a5..00db5fc3 100644 --- a/src/app/dispatcher.rs +++ b/src/app/dispatcher.rs @@ -404,6 +404,7 @@ async fn handle_exec_command(cli: &Cli, ctx: &AppContext, command: &str) -> Resu check_all_nodes: cli.check_all_nodes, sudo_password, batch: cli.batch, + fail_fast: cli.fail_fast, }; execute_command(params).await } diff --git a/src/cli.rs b/src/cli.rs index 3a0219f5..03f57292 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -23,7 +23,7 @@ use std::path::PathBuf; before_help = "\n\nBroadcast SSH - Parallel command execution across cluster nodes", about = "Broadcast SSH - SSH-compatible parallel command execution tool", long_about = "bssh is a high-performance SSH client with parallel execution capabilities.\nIt can be used as a drop-in replacement for SSH (single host) or as a powerful cluster management tool (multiple hosts).\n\nThe tool provides secure file transfer using SFTP and supports SSH keys, SSH agent, and password authentication.\nIt automatically detects Backend.AI multi-node session environments.\n\nOutput Modes:\n- TUI Mode (default): Interactive terminal UI with real-time monitoring (auto-enabled in terminals)\n- Stream Mode (--stream): Real-time output with [node] prefixes\n- File Mode (--output-dir): Save per-node output to timestamped files\n- Normal Mode: Traditional output after all nodes complete\n\nSSH Configuration Support:\n- Reads standard SSH config files (defaulting to ~/.ssh/config)\n- Supports Host patterns, HostName, User, Port, IdentityFile, StrictHostKeyChecking\n- ProxyJump, and many other SSH configuration directives\n- CLI arguments override SSH config values following SSH precedence rules", - after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n Batch Mode (Ctrl+C Handling):\n bssh -C prod \"long-running-command\" # Default: first Ctrl+C shows status, second terminates\n bssh -C prod -b \"long-command\" # Batch mode: single Ctrl+C terminates immediately\n bssh -H nodes --batch --stream \"cmd\" # Useful for CI/CD and non-interactive scripts\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" + after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Fail-Fast Mode (pdsh -k compatible):\n bssh -k -H \"web1,web2,web3\" \"deploy.sh\" # Stop on first failure\n bssh --fail-fast -C prod \"apt upgrade\" # Critical deployment - stop if any node fails\n bssh -k --require-all-success -C prod cmd # Fail-fast + require all success\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n Batch Mode (Ctrl+C Handling):\n bssh -C prod \"long-running-command\" # Default: first Ctrl+C shows status, second terminates\n bssh -C prod -b \"long-command\" # Batch mode: single Ctrl+C terminates immediately\n bssh -H nodes --batch --stream \"cmd\" # Useful for CI/CD and non-interactive scripts\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" )] pub struct Cli { /// SSH destination in format: [user@]hostname[:port] or ssh://[user@]hostname[:port] @@ -196,6 +196,13 @@ pub struct Cli { )] pub check_all_nodes: bool, + #[arg( + short = 'k', + long = "fail-fast", + help = "Stop execution immediately on first failure (pdsh -k compatible)\nCancels pending commands when any node fails (connection error or non-zero exit)\nUseful for critical operations where partial execution is unacceptable" + )] + pub fail_fast: bool, + #[arg( trailing_var_arg = true, help = "Command to execute on remote hosts", diff --git a/src/commands/exec.rs b/src/commands/exec.rs index 7512f63f..afb03621 100644 --- a/src/commands/exec.rs +++ b/src/commands/exec.rs @@ -46,6 +46,7 @@ pub struct ExecuteCommandParams<'a> { pub check_all_nodes: bool, pub sudo_password: Option>, pub batch: bool, + pub fail_fast: bool, } pub async fn execute_command(params: ExecuteCommandParams<'_>) -> Result<()> { @@ -212,7 +213,8 @@ async fn execute_command_without_forwarding(params: ExecuteCommandParams<'_>) -> .with_connect_timeout(params.connect_timeout) .with_jump_hosts(params.jump_hosts.map(|s| s.to_string())) .with_sudo_password(params.sudo_password) - .with_batch_mode(params.batch); + .with_batch_mode(params.batch) + .with_fail_fast(params.fail_fast); // Set keychain usage if on macOS #[cfg(target_os = "macos")] diff --git a/src/executor/parallel.rs b/src/executor/parallel.rs index df776165..48c1be62 100644 --- a/src/executor/parallel.rs +++ b/src/executor/parallel.rs @@ -48,6 +48,7 @@ pub struct ParallelExecutor { pub(crate) jump_hosts: Option, pub(crate) sudo_password: Option>, pub(crate) batch: bool, + pub(crate) fail_fast: bool, } impl ParallelExecutor { @@ -82,6 +83,7 @@ impl ParallelExecutor { jump_hosts: None, sudo_password: None, batch: false, + fail_fast: false, } } @@ -107,6 +109,7 @@ impl ParallelExecutor { jump_hosts: None, sudo_password: None, batch: false, + fail_fast: false, } } @@ -133,6 +136,7 @@ impl ParallelExecutor { jump_hosts: None, sudo_password: None, batch: false, + fail_fast: false, } } @@ -179,11 +183,28 @@ impl ParallelExecutor { self } + /// Enable fail-fast mode (pdsh -k compatibility). + /// + /// When enabled, execution stops immediately when any node fails: + /// - Connection failure to any host + /// - Non-zero exit code from any command + /// + /// Pending commands are cancelled and running commands are terminated gracefully. + pub fn with_fail_fast(mut self, fail_fast: bool) -> Self { + self.fail_fast = fail_fast; + self + } + /// Execute a command on all nodes in parallel. pub async fn execute(&self, command: &str) -> Result> { use std::time::Duration; use tokio::signal; + // Use fail-fast execution if enabled + if self.fail_fast { + return self.execute_with_fail_fast(command).await; + } + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -291,6 +312,240 @@ impl ParallelExecutor { } } + /// Execute a command with fail-fast mode enabled. + /// + /// Stops execution immediately when any node fails (connection error or non-zero exit code). + /// Cancels pending commands and terminates running commands gracefully. + async fn execute_with_fail_fast(&self, command: &str) -> Result> { + use tokio::sync::watch; + + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); + let multi_progress = MultiProgress::new(); + let style = create_progress_style()?; + + // Cancellation token: false = continue, true = cancelled + let (cancel_tx, cancel_rx) = watch::channel(false); + let cancel_tx = Arc::new(cancel_tx); + + // Track all spawned task handles + let mut handles: Vec> = + Vec::with_capacity(self.nodes.len()); + + // Spawn tasks for each node + for node in &self.nodes { + let node = node.clone(); + let command = command.to_string(); + let key_path = self.key_path.clone(); + let strict_mode = self.strict_mode; + let use_agent = self.use_agent; + let use_password = self.use_password; + #[cfg(target_os = "macos")] + let use_keychain = self.use_keychain; + let timeout = self.timeout; + let connect_timeout = self.connect_timeout; + let jump_hosts = self.jump_hosts.clone(); + let sudo_password = self.sudo_password.clone(); + let semaphore = Arc::clone(&semaphore); + let pb = setup_progress_bar(&multi_progress, &node, style.clone(), "Connecting..."); + let mut cancel_rx = cancel_rx.clone(); + + let handle = tokio::spawn(async move { + // Check if already cancelled before acquiring semaphore + if *cancel_rx.borrow() { + pb.finish_with_message("○ Cancelled".to_string()); + return ExecutionResult { + node, + result: Err(anyhow::anyhow!("Execution cancelled due to fail-fast")), + is_main_rank: false, + }; + } + + // Race between semaphore acquisition and cancellation + let permit = tokio::select! { + biased; + _ = cancel_rx.changed() => { + pb.finish_with_message("○ Cancelled".to_string()); + return ExecutionResult { + node, + result: Err(anyhow::anyhow!("Execution cancelled due to fail-fast")), + is_main_rank: false, + }; + } + permit = semaphore.acquire() => { + match permit { + Ok(p) => p, + Err(e) => { + pb.finish_with_message("● Semaphore closed".to_string()); + return ExecutionResult { + node, + result: Err(anyhow::anyhow!("Semaphore acquisition failed: {e}")), + is_main_rank: false, + }; + } + } + } + }; + + // Check cancellation again after acquiring semaphore + if *cancel_rx.borrow() { + drop(permit); + pb.finish_with_message("○ Cancelled".to_string()); + return ExecutionResult { + node, + result: Err(anyhow::anyhow!("Execution cancelled due to fail-fast")), + is_main_rank: false, + }; + } + + pb.set_message("Executing...".to_string()); + + let config = ExecutionConfig { + key_path: key_path.as_deref(), + strict_mode, + use_agent, + use_password, + #[cfg(target_os = "macos")] + use_keychain, + timeout, + connect_timeout, + jump_hosts: jump_hosts.as_deref(), + sudo_password: sudo_password.clone(), + }; + + // Execute the command (keeping the permit alive) + let result = super::connection_manager::execute_on_node_with_jump_hosts( + node.clone(), + &command, + &config, + ) + .await; + + // Release the permit explicitly + drop(permit); + + // Format result + match &result { + Ok(cmd_result) => { + if cmd_result.is_success() { + pb.finish_with_message("● Success".to_string()); + } else { + pb.finish_with_message(format!( + "● Exit code: {}", + cmd_result.exit_status + )); + } + } + Err(e) => { + let error_msg = format!("{e:#}"); + let first_line = error_msg.lines().next().unwrap_or("Unknown error"); + let short_error = if first_line.len() > 50 { + format!("{}...", &first_line[..first_line.floor_char_boundary(47)]) + } else { + first_line.to_string() + }; + pb.finish_with_message(format!("● {short_error}")); + } + } + + ExecutionResult { + node, + result, + is_main_rank: false, + } + }); + + handles.push(handle); + } + + // Collect results, checking for failures as they complete + let mut results: Vec = Vec::with_capacity(handles.len()); + let mut first_failure: Option<(Node, String)> = None; + + // Process tasks as they complete + while !handles.is_empty() { + // Wait for any task to complete + let (completed_result, index, remaining) = futures::future::select_all(handles).await; + handles = remaining; + + match completed_result { + Ok(exec_result) => { + // Check if this is a failure + let is_failure = !exec_result.is_success(); + let node_info = exec_result.node.clone(); + + // Store failure info if this is the first failure + if is_failure && first_failure.is_none() { + let error_msg = match &exec_result.result { + Ok(cmd_result) => format!("exit code {}", cmd_result.exit_status), + Err(e) => format!("{e}"), + }; + first_failure = Some((node_info.clone(), error_msg)); + + // Signal cancellation to remaining tasks + let _ = cancel_tx.send(true); + + // Report failure + eprintln!( + "\n[fail-fast] Execution stopped: {} failed ({})", + node_info, + first_failure + .as_ref() + .map(|(_, e)| e.as_str()) + .unwrap_or("unknown error") + ); + } + + results.push(exec_result); + } + Err(e) => { + // Task panicked + tracing::error!("Task panicked at index {}: {}", index, e); + if let Some(node) = self.nodes.get(index) { + let error_msg = format!("Task panicked: {e}"); + if first_failure.is_none() { + first_failure = Some((node.clone(), error_msg.clone())); + let _ = cancel_tx.send(true); + eprintln!("\n[fail-fast] Execution stopped: {} panicked", node); + } + results.push(ExecutionResult { + node: node.clone(), + result: Err(anyhow::anyhow!("{}", error_msg)), + is_main_rank: false, + }); + } + } + } + } + + // Reorder results to match original node order + let mut ordered_results: Vec> = + (0..self.nodes.len()).map(|_| None).collect(); + for result in results { + if let Some(idx) = self + .nodes + .iter() + .position(|n| n.host == result.node.host && n.port == result.node.port) + { + ordered_results[idx] = Some(result); + } + } + + // Convert to final result vector, filling in missing results for cancelled tasks + let final_results: Vec = ordered_results + .into_iter() + .enumerate() + .map(|(idx, opt_result)| { + opt_result.unwrap_or_else(|| ExecutionResult { + node: self.nodes[idx].clone(), + result: Err(anyhow::anyhow!("Task did not complete (cancelled)")), + is_main_rank: false, + }) + }) + .collect(); + + self.collect_results(final_results.into_iter().map(Ok).collect()) + } + /// Upload a file to all nodes in parallel. pub async fn upload_file( &self, diff --git a/tests/connect_timeout_test.rs b/tests/connect_timeout_test.rs index f0a19da8..cad73d8b 100644 --- a/tests/connect_timeout_test.rs +++ b/tests/connect_timeout_test.rs @@ -21,14 +21,15 @@ use std::process::Command; /// Helper to get bssh binary path fn bssh_binary() -> String { - // Try release first, then debug - let release = std::env::current_dir().unwrap().join("target/release/bssh"); + // Prefer debug binary since `cargo test` builds debug binaries + // This avoids using stale release binaries in CI let debug = std::env::current_dir().unwrap().join("target/debug/bssh"); + let release = std::env::current_dir().unwrap().join("target/release/bssh"); - if release.exists() { - release.to_string_lossy().to_string() - } else if debug.exists() { + if debug.exists() { debug.to_string_lossy().to_string() + } else if release.exists() { + release.to_string_lossy().to_string() } else { panic!("bssh binary not found. Run 'cargo build' first."); } diff --git a/tests/fail_fast_test.rs b/tests/fail_fast_test.rs new file mode 100644 index 00000000..b0f06d81 --- /dev/null +++ b/tests/fail_fast_test.rs @@ -0,0 +1,336 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for fail-fast (--fail-fast / -k) functionality. +//! +//! These tests verify the fail-fast execution mode that stops immediately +//! when any node fails (connection error or non-zero exit code). + +use bssh::executor::{ExecutionResult, ParallelExecutor}; +use bssh::node::Node; +use bssh::ssh::client::CommandResult; +use bssh::ssh::known_hosts::StrictHostKeyChecking; +use serial_test::serial; + +/// Helper to create a success result +fn success_result(host: &str) -> ExecutionResult { + ExecutionResult { + node: Node::new(host.to_string(), 22, "user".to_string()), + result: Ok(CommandResult { + host: host.to_string(), + output: Vec::new(), + stderr: Vec::new(), + exit_status: 0, + }), + is_main_rank: false, + } +} + +/// Helper to create a failure result with specific exit code +fn failure_result(host: &str, exit_code: u32) -> ExecutionResult { + ExecutionResult { + node: Node::new(host.to_string(), 22, "user".to_string()), + result: Ok(CommandResult { + host: host.to_string(), + output: Vec::new(), + stderr: Vec::new(), + exit_status: exit_code, + }), + is_main_rank: false, + } +} + +/// Helper to create an error result (connection failure) +fn error_result(host: &str, error_msg: &str) -> ExecutionResult { + ExecutionResult { + node: Node::new(host.to_string(), 22, "user".to_string()), + result: Err(anyhow::anyhow!("{}", error_msg)), + is_main_rank: false, + } +} + +#[test] +#[serial] +fn test_fail_fast_builder_method() { + // Test that the builder method can be called (compile-time check) + // Note: fail_fast is pub(crate), so we can only verify the builder API works + let nodes = vec![ + Node::new("host1".to_string(), 22, "user".to_string()), + Node::new("host2".to_string(), 22, "user".to_string()), + ]; + + // Create executor without fail-fast (default) + let _executor = ParallelExecutor::new(nodes.clone(), 10, None); + + // Create executor with fail-fast enabled + let _executor = ParallelExecutor::new(nodes.clone(), 10, None).with_fail_fast(true); + + // Create executor with fail-fast disabled explicitly + let _executor = ParallelExecutor::new(nodes, 10, None).with_fail_fast(false); + + // If we reach here, the builder API compiles correctly +} + +#[test] +#[serial] +fn test_fail_fast_all_options_constructor() { + // Test that all constructor variants can be chained with fail_fast + let nodes = vec![Node::new("host1".to_string(), 22, "user".to_string())]; + + // Test that with_fail_fast can be chained with any constructor + let _executor = ParallelExecutor::new(nodes.clone(), 10, None).with_fail_fast(true); + + let _executor = + ParallelExecutor::new_with_strict_mode(nodes.clone(), 10, None, StrictHostKeyChecking::Yes) + .with_fail_fast(true); + + let _executor = ParallelExecutor::new_with_strict_mode_and_agent( + nodes.clone(), + 10, + None, + StrictHostKeyChecking::Yes, + true, + ) + .with_fail_fast(true); + + let _executor = ParallelExecutor::new_with_all_options( + nodes, + 10, + None, + StrictHostKeyChecking::Yes, + true, + false, + ) + .with_fail_fast(true); + + // If we reach here, all constructors work with with_fail_fast() +} + +#[test] +#[serial] +fn test_fail_fast_result_classification() { + // Test that results are correctly classified as success/failure + + // Success case + let result = success_result("host1"); + assert!(result.is_success(), "Exit code 0 should be success"); + + // Non-zero exit code cases + let result = failure_result("host1", 1); + assert!(!result.is_success(), "Exit code 1 should be failure"); + + let result = failure_result("host1", 137); + assert!( + !result.is_success(), + "Exit code 137 (OOM) should be failure" + ); + + let result = failure_result("host1", 139); + assert!( + !result.is_success(), + "Exit code 139 (SIGSEGV) should be failure" + ); + + // Connection error case + let result = error_result("host1", "Connection refused"); + assert!( + !result.is_success(), + "Connection error should be classified as failure" + ); +} + +#[test] +#[serial] +fn test_fail_fast_exit_code_extraction() { + // Test exit code extraction from results + + let result = success_result("host1"); + assert_eq!( + result.get_exit_code(), + 0, + "Success should return exit code 0" + ); + + let result = failure_result("host1", 42); + assert_eq!(result.get_exit_code(), 42, "Should return actual exit code"); + + let result = error_result("host1", "Connection error"); + assert_eq!( + result.get_exit_code(), + 1, + "Connection error should return exit code 1" + ); +} + +#[test] +#[serial] +fn test_fail_fast_with_require_all_success_interaction() { + // Test that fail-fast can be combined with require_all_success + // They complement each other: fail-fast stops early, require_all_success affects exit code + + // Scenario: fail-fast stops on first failure, but final exit code is determined by strategy + let results = [ + success_result("host1"), + failure_result("host2", 1), + // host3 would be cancelled by fail-fast + error_result("host3", "Execution cancelled due to fail-fast"), + ]; + + // All three should be considered for final exit code determination + let has_any_failure = results.iter().any(|r| !r.is_success()); + assert!( + has_any_failure, + "Should detect failure in results even with cancelled tasks" + ); +} + +#[test] +#[serial] +fn test_cancellation_error_message() { + // Verify the specific error message for cancelled tasks + let result = error_result("host1", "Execution cancelled due to fail-fast"); + + // Check that the error message is preserved + if let Err(e) = &result.result { + let msg = format!("{}", e); + assert!( + msg.contains("fail-fast"), + "Cancellation error should mention fail-fast: {msg}" + ); + } else { + panic!("Expected error result for cancelled task"); + } +} + +/// Test that the --fail-fast flag is correctly parsed +#[test] +#[serial] +fn test_cli_fail_fast_flag_parsing() { + use bssh::cli::Cli; + use clap::Parser; + + // Test short form -k + let args = ["bssh", "-H", "host1,host2", "-k", "echo test"]; + let cli = Cli::try_parse_from(args).expect("Should parse with -k flag"); + assert!(cli.fail_fast, "Short flag -k should set fail_fast=true"); + + // Test long form --fail-fast + let args = ["bssh", "-H", "host1,host2", "--fail-fast", "echo test"]; + let cli = Cli::try_parse_from(args).expect("Should parse with --fail-fast flag"); + assert!( + cli.fail_fast, + "Long flag --fail-fast should set fail_fast=true" + ); + + // Test without flag (default) + let args = ["bssh", "-H", "host1,host2", "echo test"]; + let cli = Cli::try_parse_from(args).expect("Should parse without fail-fast flag"); + assert!(!cli.fail_fast, "Default should be fail_fast=false"); +} + +/// Test that fail-fast works with different parallelism settings +#[test] +#[serial] +fn test_fail_fast_with_parallelism() { + let nodes = vec![ + Node::new("host1".to_string(), 22, "user".to_string()), + Node::new("host2".to_string(), 22, "user".to_string()), + Node::new("host3".to_string(), 22, "user".to_string()), + Node::new("host4".to_string(), 22, "user".to_string()), + Node::new("host5".to_string(), 22, "user".to_string()), + ]; + + // Test with high parallelism (all tasks start immediately) + let _executor = ParallelExecutor::new(nodes.clone(), 10, None).with_fail_fast(true); + + // Test with low parallelism (some tasks wait in queue) + let _executor = ParallelExecutor::new(nodes.clone(), 2, None).with_fail_fast(true); + + // Test with parallelism of 1 (sequential) + let _executor = ParallelExecutor::new(nodes, 1, None).with_fail_fast(true); + + // If we reach here, fail_fast can be combined with any parallelism setting +} + +/// Test fail-fast interaction with other flags +#[test] +#[serial] +fn test_fail_fast_flag_combinations() { + use bssh::cli::Cli; + use clap::Parser; + + // fail-fast + require-all-success + let args = [ + "bssh", + "-H", + "host1,host2", + "--fail-fast", + "--require-all-success", + "echo test", + ]; + let cli = Cli::try_parse_from(args).expect("Should parse with both flags"); + assert!(cli.fail_fast); + assert!(cli.require_all_success); + + // fail-fast + check-all-nodes + let args = [ + "bssh", + "-H", + "host1,host2", + "--fail-fast", + "--check-all-nodes", + "echo test", + ]; + let cli = Cli::try_parse_from(args).expect("Should parse with fail-fast and check-all-nodes"); + assert!(cli.fail_fast); + assert!(cli.check_all_nodes); + + // fail-fast + verbose + let args = ["bssh", "-H", "host1,host2", "-k", "-v", "echo test"]; + let cli = Cli::try_parse_from(args).expect("Should parse with fail-fast and verbose"); + assert!(cli.fail_fast); + assert_eq!(cli.verbose, 1); + + // fail-fast + timeout + let args = [ + "bssh", + "-H", + "host1,host2", + "-k", + "--timeout", + "60", + "echo test", + ]; + let cli = Cli::try_parse_from(args).expect("Should parse with fail-fast and timeout"); + assert!(cli.fail_fast); + assert_eq!(cli.timeout, 60); +} + +/// Test that -k doesn't conflict with existing short options +#[test] +#[serial] +fn test_k_flag_no_conflict() { + use bssh::cli::Cli; + use clap::Parser; + + // Verify -k is distinct from other flags + // The -k flag is now assigned to fail-fast (pdsh compatibility) + + let args = ["bssh", "-H", "host1", "-k", "uptime"]; + let result = Cli::try_parse_from(args); + assert!(result.is_ok(), "-k should be a valid flag"); + + let cli = result.unwrap(); + assert!(cli.fail_fast, "-k should set fail_fast=true"); +}