From 4fc629fdfdd1171af86b327d0c8c7f442dc1bd0d Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Tue, 16 Dec 2025 23:34:10 +0900 Subject: [PATCH 1/3] feat: Add --batch option for single Ctrl+C termination (pdsh -b compatibility) Add --batch / -b option that changes Ctrl+C behavior to immediately terminate all parallel jobs with a single press. This improves automation and CI/CD integration by providing immediate termination without requiring confirmation. Changes: - Add --batch / -b CLI flag to Cli struct - Pass batch flag through ExecuteCommandParams to ParallelExecutor - Implement two-stage Ctrl+C handling in stream mode: * Default: First Ctrl+C shows status, second terminates (within 1s) * Batch mode: Single Ctrl+C immediately terminates all jobs - Update TUI mode signature to accept batch parameter (reserved for future use) - Add CLI help examples for batch mode usage - Update README.md with Batch Mode section and examples - Document signal handling implementation in ARCHITECTURE.md The default behavior provides visibility into execution progress before termination, while batch mode is optimized for scripts and non-interactive environments. Implements: #95 --- ARCHITECTURE.md | 52 ++++++++++++++++++++++++++ README.md | 26 +++++++++++++ src/app/dispatcher.rs | 1 + src/cli.rs | 9 ++++- src/commands/exec.rs | 5 ++- src/executor/parallel.rs | 79 +++++++++++++++++++++++++++++++++++++++- src/ui/tui/mod.rs | 1 + 7 files changed, 169 insertions(+), 4 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 9ea17a34..fe1bdb26 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -300,6 +300,58 @@ let tasks: Vec>> = nodes - Buffered I/O for output collection - Early termination on critical failures +**Signal Handling (Added 2025-12-16, Issue #95):** + +The executor supports two modes for handling Ctrl+C (SIGINT) signals during parallel execution: + +1. **Default Mode (Two-Stage)**: + - First Ctrl+C: Displays status (running/completed job counts) + - Second Ctrl+C (within 1 second): Terminates all jobs immediately + - Provides users visibility into execution progress before termination + +2. **Batch Mode (`--batch` / `-b`)**: + - Single Ctrl+C: Immediately terminates all jobs + - Optimized for non-interactive environments (CI/CD, scripts) + - Compatible with pdsh `-b` option for tool compatibility + +Implementation is in `executor/parallel.rs` using `tokio::select!` to handle signals alongside normal execution: + +```rust +loop { + tokio::select! { + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: terminate immediately + for handle in pending_handles.drain(..) { + handle.abort(); + } + break; + } else { + // Two-stage mode: first shows status, second terminates + if !first_ctrl_c { + first_ctrl_c = true; + // Show status + eprintln!("Status: {} running, {} completed", ...); + } else if within_time_window() { + // Second Ctrl+C within 1 second + for handle in pending_handles.drain(..) { + handle.abort(); + } + break; + } + } + } + // Regular execution polling + _ = execution_loop() => { ... } + } +} +``` + +The batch flag is passed through the executor chain: +- CLI `--batch` flag → `ExecuteCommandParams.batch` → `ParallelExecutor.batch` +- Applied in stream mode and normal mode execution paths +- TUI mode maintains its own quit handling (reserved for future use) + ### 4. SSH Client (`ssh/client/*`, `ssh/tokio_client/*`) **SSH Client Module Structure (Refactored 2025-10-17):** diff --git a/README.md b/README.md index e501b360..3d8762d7 100644 --- a/README.md +++ b/README.md @@ -330,6 +330,32 @@ bssh -C production "df -h" > disk-usage.log CI=true bssh -C production "command" ``` +### Batch Mode (Ctrl+C Handling) + +bssh provides two modes for handling Ctrl+C during parallel execution: + +**Default (Two-Stage)**: +- First Ctrl+C: Shows status (running/completed counts) +- Second Ctrl+C (within 1 second): Terminates all jobs + +**Batch Mode (`-b` / `--batch`)**: +- Single Ctrl+C: Immediately terminates all jobs +- Useful for non-interactive scripts and CI/CD pipelines + +```bash +# Default behavior (two-stage Ctrl+C) +bssh -C production "long-running-command" +# Ctrl+C once: shows status +# Ctrl+C again (within 1s): terminates + +# Batch mode (immediate termination) +bssh -C production -b "long-running-command" +# Ctrl+C once: immediately terminates all jobs + +# Useful for automation +bssh -H nodes --batch --stream "deployment-script.sh" +``` + ### Built-in Commands ```bash # Test connectivity to hosts diff --git a/src/app/dispatcher.rs b/src/app/dispatcher.rs index e7990d8b..41d161a5 100644 --- a/src/app/dispatcher.rs +++ b/src/app/dispatcher.rs @@ -403,6 +403,7 @@ async fn handle_exec_command(cli: &Cli, ctx: &AppContext, command: &str) -> Resu require_all_success: cli.require_all_success, check_all_nodes: cli.check_all_nodes, sudo_password, + batch: cli.batch, }; execute_command(params).await } diff --git a/src/cli.rs b/src/cli.rs index ef1f23e7..df24f9a3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -23,7 +23,7 @@ use std::path::PathBuf; before_help = "\n\nBroadcast SSH - Parallel command execution across cluster nodes", about = "Broadcast SSH - SSH-compatible parallel command execution tool", long_about = "bssh is a high-performance SSH client with parallel execution capabilities.\nIt can be used as a drop-in replacement for SSH (single host) or as a powerful cluster management tool (multiple hosts).\n\nThe tool provides secure file transfer using SFTP and supports SSH keys, SSH agent, and password authentication.\nIt automatically detects Backend.AI multi-node session environments.\n\nOutput Modes:\n- TUI Mode (default): Interactive terminal UI with real-time monitoring (auto-enabled in terminals)\n- Stream Mode (--stream): Real-time output with [node] prefixes\n- File Mode (--output-dir): Save per-node output to timestamped files\n- Normal Mode: Traditional output after all nodes complete\n\nSSH Configuration Support:\n- Reads standard SSH config files (defaulting to ~/.ssh/config)\n- Supports Host patterns, HostName, User, Port, IdentityFile, StrictHostKeyChecking\n- ProxyJump, and many other SSH configuration directives\n- CLI arguments override SSH config values following SSH precedence rules", - after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" + after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n Batch Mode (Ctrl+C Handling):\n bssh -C prod \"long-running-command\" # Default: first Ctrl+C shows status, second terminates\n bssh -C prod -b \"long-command\" # Batch mode: single Ctrl+C terminates immediately\n bssh -H nodes --batch --stream \"cmd\" # Useful for CI/CD and non-interactive scripts\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" )] pub struct Cli { /// SSH destination in format: [user@]hostname[:port] or ssh://[user@]hostname[:port] @@ -104,6 +104,13 @@ pub struct Cli { )] pub sudo_password: bool, + #[arg( + short = 'b', + long = "batch", + help = "Batch mode: single Ctrl+C immediately terminates all jobs\nDisables two-stage Ctrl+C handling (status display on first press)\nUseful for non-interactive scripts and CI/CD pipelines" + )] + pub batch: bool, + #[arg( short = 'J', long = "jump-host", diff --git a/src/commands/exec.rs b/src/commands/exec.rs index 63fff6c6..7512f63f 100644 --- a/src/commands/exec.rs +++ b/src/commands/exec.rs @@ -45,6 +45,7 @@ pub struct ExecuteCommandParams<'a> { pub require_all_success: bool, pub check_all_nodes: bool, pub sudo_password: Option>, + pub batch: bool, } pub async fn execute_command(params: ExecuteCommandParams<'_>) -> Result<()> { @@ -174,6 +175,7 @@ async fn execute_command_with_forwarding(params: ExecuteCommandParams<'_>) -> Re // Execute the actual command let result = execute_command_without_forwarding(ExecuteCommandParams { port_forwards: None, // Remove forwarding from params to avoid recursion + batch: params.batch, ..params }) .await; @@ -209,7 +211,8 @@ async fn execute_command_without_forwarding(params: ExecuteCommandParams<'_>) -> .with_timeout(params.timeout) .with_connect_timeout(params.connect_timeout) .with_jump_hosts(params.jump_hosts.map(|s| s.to_string())) - .with_sudo_password(params.sudo_password); + .with_sudo_password(params.sudo_password) + .with_batch_mode(params.batch); // Set keychain usage if on macOS #[cfg(target_os = "macos")] diff --git a/src/executor/parallel.rs b/src/executor/parallel.rs index 8a068a56..02952249 100644 --- a/src/executor/parallel.rs +++ b/src/executor/parallel.rs @@ -47,6 +47,7 @@ pub struct ParallelExecutor { pub(crate) connect_timeout: Option, pub(crate) jump_hosts: Option, pub(crate) sudo_password: Option>, + pub(crate) batch: bool, } impl ParallelExecutor { @@ -80,6 +81,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -104,6 +106,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -129,6 +132,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -166,6 +170,15 @@ impl ParallelExecutor { self } + /// Set batch mode for signal handling. + /// + /// When set to true, a single Ctrl+C will immediately terminate all jobs. + /// When false (default), first Ctrl+C shows status, second terminates. + pub fn with_batch_mode(mut self, batch: bool) -> Self { + self.batch = batch; + self + } + /// Execute a command on all nodes in parallel. pub async fn execute(&self, command: &str) -> Result> { let semaphore = Arc::new(Semaphore::new(self.max_parallel)); @@ -665,12 +678,74 @@ impl ParallelExecutor { ) -> Result> { use super::output_sync::NodeOutputWriter; use std::time::Duration; + use tokio::signal; let mut pending_handles = handles; let mut results = Vec::new(); + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; // Poll until all tasks complete - while !pending_handles.is_empty() || !manager.all_complete() { + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Set a termination flag + break; + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + break; + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + } + } + } + } + } + _ = async { + if pending_handles.is_empty() && manager.all_complete() { + return; + } + + tokio::time::sleep(Duration::from_millis(1)).await; + } => { + // Check if all tasks are done + if pending_handles.is_empty() && manager.all_complete() { + break; + } + } + } + + // Continue with regular processing // Poll all streams for new output manager.poll_all(); @@ -774,7 +849,7 @@ impl ParallelExecutor { // Run TUI event loop - this will block until user quits or all complete // The TUI itself will handle polling the manager - let user_quit = match tui::run_tui(manager, cluster_name, command).await { + let user_quit = match tui::run_tui(manager, cluster_name, command, self.batch).await { Ok(tui::TuiExitReason::UserQuit) => true, Ok(tui::TuiExitReason::AllTasksCompleted) => false, Err(e) => { diff --git a/src/ui/tui/mod.rs b/src/ui/tui/mod.rs index c5e1f1dd..859fef52 100644 --- a/src/ui/tui/mod.rs +++ b/src/ui/tui/mod.rs @@ -52,6 +52,7 @@ pub async fn run_tui( manager: &mut MultiNodeStreamManager, cluster_name: &str, command: &str, + _batch_mode: bool, // Reserved for future use; TUI has its own quit handling ) -> Result { // Setup terminal with automatic cleanup guard let _terminal_guard = TerminalGuard::new()?; From 813bfe78fb55f7272ad8b109a50acef4ce566fcc Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Tue, 16 Dec 2025 23:44:21 +0900 Subject: [PATCH 2/3] fix: Address PR #102 review issues for signal handling This commit fixes all HIGH and MEDIUM severity issues identified in PR #102 review: HIGH SEVERITY FIXES: 1. Signal Handler Race Condition - Time Window Reset Logic Bug - Fixed: When time window expires (>1 second) and user presses Ctrl+C again, the code now displays running/completed status in the reset path - Location: src/executor/parallel.rs lines 701-713 (stream mode) - Impact: Consistent user experience across all Ctrl+C press scenarios 2. Inconsistent Signal Handling Across Execution Modes - Fixed: Added complete signal handling to normal execute() method - Previously only handle_stream_mode() had signal handling - Location: src/executor/parallel.rs lines 172-280 (execute method) - Impact: Batch mode and two-stage Ctrl+C now work in all execution modes MEDIUM SEVERITY FIXES: 3. Missing Exit Code Handling After Signal Termination - Fixed: All signal terminations now exit with code 130 (SIGINT standard) - Applied to both batch and non-batch modes in all execution paths - Location: Multiple locations in src/executor/parallel.rs - Impact: Scripts can now distinguish user interruption from command failure 4. No Documentation Conflict Warning for TUI Mode - Fixed: Updated CLI help text to clarify TUI mode ignores batch flag - Location: src/cli.rs line 103 - Impact: Clear user expectations for TUI mode behavior 5. Documentation Mismatch in ARCHITECTURE.md - Fixed: Updated pseudocode and documentation to match actual implementation - Added exit code handling details and implementation coverage notes - Location: ARCHITECTURE.md lines 303-394 - Impact: Accurate documentation for future maintainers TESTING: - All tests pass (cargo test) - No clippy warnings (cargo clippy -- -D warnings) - Code properly formatted (cargo fmt --check) Changes: - ARCHITECTURE.md: Updated signal handling documentation with accurate pseudocode - src/cli.rs: Added TUI mode note to batch flag help text - src/executor/parallel.rs: Added signal handling to execute(), fixed reset path status display, added exit code 130 --- ARCHITECTURE.md | 69 ++++++++++++++++++++++++++------- src/cli.rs | 2 +- src/executor/parallel.rs | 82 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 133 insertions(+), 20 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index fe1bdb26..f11cfc2a 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -300,20 +300,32 @@ let tasks: Vec>> = nodes - Buffered I/O for output collection - Early termination on critical failures -**Signal Handling (Added 2025-12-16, Issue #95):** +**Signal Handling (Added 2025-12-16, Issue #95; Updated 2025-12-16, PR #102):** The executor supports two modes for handling Ctrl+C (SIGINT) signals during parallel execution: 1. **Default Mode (Two-Stage)**: - First Ctrl+C: Displays status (running/completed job counts) - - Second Ctrl+C (within 1 second): Terminates all jobs immediately + - Second Ctrl+C (within 1 second): Terminates all jobs immediately with exit code 130 + - Time window reset: If >1 second passes, next Ctrl+C restarts the sequence and shows status again - Provides users visibility into execution progress before termination 2. **Batch Mode (`--batch` / `-b`)**: - - Single Ctrl+C: Immediately terminates all jobs + - Single Ctrl+C: Immediately terminates all jobs with exit code 130 - Optimized for non-interactive environments (CI/CD, scripts) - Compatible with pdsh `-b` option for tool compatibility +**Exit Code Handling:** +- Normal completion: Exit code determined by ExitCodeStrategy (MainRank/RequireAllSuccess/etc.) +- Signal termination (Ctrl+C): Always exits with code 130 (standard SIGINT exit code) +- This ensures scripts can detect user interruption vs. command failure + +**Implementation Coverage:** +Signal handling is implemented in both execution modes: +- `execute()` method (normal/progress bar mode) - lines 172-280 +- `handle_stream_mode()` method (stream mode) - lines 714-838 +- TUI mode has its own quit handling (q or Ctrl+C) and ignores the batch flag + Implementation is in `executor/parallel.rs` using `tokio::select!` to handle signals alongside normal execution: ```rust @@ -322,35 +334,64 @@ loop { _ = signal::ctrl_c() => { if self.batch { // Batch mode: terminate immediately + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); for handle in pending_handles.drain(..) { handle.abort(); } - break; + // Exit with SIGINT exit code (130) + std::process::exit(130); } else { // Two-stage mode: first shows status, second terminates if !first_ctrl_c { first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + // Show status - eprintln!("Status: {} running, {} completed", ...); - } else if within_time_window() { - // Second Ctrl+C within 1 second - for handle in pending_handles.drain(..) { - handle.abort(); + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: check time window + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + // Within time window: terminate + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Time window expired: reset and show status again + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } } - break; } } } - // Regular execution polling - _ = execution_loop() => { ... } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_results(results); + } } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; } ``` The batch flag is passed through the executor chain: - CLI `--batch` flag → `ExecuteCommandParams.batch` → `ParallelExecutor.batch` -- Applied in stream mode and normal mode execution paths -- TUI mode maintains its own quit handling (reserved for future use) +- Applied in both normal mode (`execute()`) and stream mode (`handle_stream_mode()`) +- TUI mode maintains its own quit handling and ignores this flag ### 4. SSH Client (`ssh/client/*`, `ssh/tokio_client/*`) diff --git a/src/cli.rs b/src/cli.rs index df24f9a3..3a0219f5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -107,7 +107,7 @@ pub struct Cli { #[arg( short = 'b', long = "batch", - help = "Batch mode: single Ctrl+C immediately terminates all jobs\nDisables two-stage Ctrl+C handling (status display on first press)\nUseful for non-interactive scripts and CI/CD pipelines" + help = "Batch mode: single Ctrl+C immediately terminates all jobs\nDisables two-stage Ctrl+C handling (status display on first press)\nUseful for non-interactive scripts and CI/CD pipelines\nNote: TUI mode has its own quit handling (q or Ctrl+C) and ignores this flag" )] pub batch: bool, diff --git a/src/executor/parallel.rs b/src/executor/parallel.rs index 02952249..ea59f4b0 100644 --- a/src/executor/parallel.rs +++ b/src/executor/parallel.rs @@ -181,6 +181,9 @@ impl ParallelExecutor { /// Execute a command on all nodes in parallel. pub async fn execute(&self, command: &str) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -223,8 +226,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Upload a file to all nodes in parallel. @@ -696,8 +760,8 @@ impl ParallelExecutor { for handle in pending_handles.drain(..) { handle.abort(); } - // Set a termination flag - break; + // Exit with SIGINT exit code (130) + std::process::exit(130); } else { // Non-batch mode: two-stage Ctrl+C if !first_ctrl_c { @@ -720,12 +784,20 @@ impl ParallelExecutor { for handle in pending_handles.drain(..) { handle.abort(); } - break; + // Exit with SIGINT exit code (130) + std::process::exit(130); } else { // Too much time passed, reset first_ctrl_c = true; ctrl_c_time = Some(std::time::Instant::now()); eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); } } } From c0fe7042cf7a2936a71cf53e70105094de429ddf Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Tue, 16 Dec 2025 23:55:18 +0900 Subject: [PATCH 3/3] fix: Add signal handling to file transfer and file output mode operations This commit adds proper Ctrl+C signal handling to all file transfer methods and the file output mode handler: - upload_file: Added tokio::select! with two-stage Ctrl+C handling - download_file: Added tokio::select! with two-stage Ctrl+C handling - download_files: Added tokio::select! with two-stage Ctrl+C handling - handle_file_mode: Refactored from simple while loop to tokio::select! pattern All methods now support: - Batch mode: Single Ctrl+C immediately terminates with exit code 130 - Non-batch mode: First Ctrl+C shows status, second terminates - Proper abort of pending task handles on termination - Status reporting showing running/completed task counts These changes complete the signal handling improvements from PR #102, ensuring all parallel execution paths can be gracefully interrupted. --- src/executor/parallel.rs | 290 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 276 insertions(+), 14 deletions(-) diff --git a/src/executor/parallel.rs b/src/executor/parallel.rs index ea59f4b0..df776165 100644 --- a/src/executor/parallel.rs +++ b/src/executor/parallel.rs @@ -297,6 +297,9 @@ impl ParallelExecutor { local_path: &Path, remote_path: &str, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -333,8 +336,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_upload_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all uploads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all uploads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_upload_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Download a file from all nodes in parallel. @@ -343,6 +407,9 @@ impl ParallelExecutor { remote_path: &str, local_dir: &Path, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -379,8 +446,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_download_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_download_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Download multiple files from all nodes. @@ -389,6 +517,9 @@ impl ParallelExecutor { remote_paths: Vec, local_dir: &Path, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -474,16 +605,77 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - - // Collect results for this file - for result in results { - match result { - Ok(download_result) => all_results.push(download_result), - Err(e) => { - tracing::error!("Task failed: {}", e); + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + // Collect results for this file + for result in results { + match result { + Ok(download_result) => all_results.push(download_result), + Err(e) => { + tracing::error!("Task failed: {}", e); + } + } + } + break; } } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; } } @@ -1034,14 +1226,84 @@ impl ParallelExecutor { let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S"); let mut pending_handles = handles; + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; - // Poll until all tasks complete - while !pending_handles.is_empty() || !manager.all_complete() { + // Poll until all tasks complete with signal handling + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = tokio::signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + _ = async { + if pending_handles.is_empty() && manager.all_complete() { + return; + } + + tokio::time::sleep(Duration::from_millis(1)).await; + } => { + // Check if all tasks are done + if pending_handles.is_empty() && manager.all_complete() { + break; + } + } + } + + // Continue with regular processing manager.poll_all(); // Check for completed tasks pending_handles.retain_mut(|handle| !handle.is_finished()); + // Small sleep to avoid busy waiting tokio::time::sleep(Duration::from_millis(50)).await; }