diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 9ea17a34..f11cfc2a 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -300,6 +300,99 @@ let tasks: Vec>> = nodes - Buffered I/O for output collection - Early termination on critical failures +**Signal Handling (Added 2025-12-16, Issue #95; Updated 2025-12-16, PR #102):** + +The executor supports two modes for handling Ctrl+C (SIGINT) signals during parallel execution: + +1. **Default Mode (Two-Stage)**: + - First Ctrl+C: Displays status (running/completed job counts) + - Second Ctrl+C (within 1 second): Terminates all jobs immediately with exit code 130 + - Time window reset: If >1 second passes, next Ctrl+C restarts the sequence and shows status again + - Provides users visibility into execution progress before termination + +2. **Batch Mode (`--batch` / `-b`)**: + - Single Ctrl+C: Immediately terminates all jobs with exit code 130 + - Optimized for non-interactive environments (CI/CD, scripts) + - Compatible with pdsh `-b` option for tool compatibility + +**Exit Code Handling:** +- Normal completion: Exit code determined by ExitCodeStrategy (MainRank/RequireAllSuccess/etc.) +- Signal termination (Ctrl+C): Always exits with code 130 (standard SIGINT exit code) +- This ensures scripts can detect user interruption vs. command failure + +**Implementation Coverage:** +Signal handling is implemented in both execution modes: +- `execute()` method (normal/progress bar mode) - lines 172-280 +- `handle_stream_mode()` method (stream mode) - lines 714-838 +- TUI mode has its own quit handling (q or Ctrl+C) and ignores the batch flag + +Implementation is in `executor/parallel.rs` using `tokio::select!` to handle signals alongside normal execution: + +```rust +loop { + tokio::select! { + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: terminate immediately + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Two-stage mode: first shows status, second terminates + if !first_ctrl_c { + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show status + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: check time window + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + // Within time window: terminate + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Time window expired: reset and show status again + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; +} +``` + +The batch flag is passed through the executor chain: +- CLI `--batch` flag → `ExecuteCommandParams.batch` → `ParallelExecutor.batch` +- Applied in both normal mode (`execute()`) and stream mode (`handle_stream_mode()`) +- TUI mode maintains its own quit handling and ignores this flag + ### 4. SSH Client (`ssh/client/*`, `ssh/tokio_client/*`) **SSH Client Module Structure (Refactored 2025-10-17):** diff --git a/README.md b/README.md index e501b360..3d8762d7 100644 --- a/README.md +++ b/README.md @@ -330,6 +330,32 @@ bssh -C production "df -h" > disk-usage.log CI=true bssh -C production "command" ``` +### Batch Mode (Ctrl+C Handling) + +bssh provides two modes for handling Ctrl+C during parallel execution: + +**Default (Two-Stage)**: +- First Ctrl+C: Shows status (running/completed counts) +- Second Ctrl+C (within 1 second): Terminates all jobs + +**Batch Mode (`-b` / `--batch`)**: +- Single Ctrl+C: Immediately terminates all jobs +- Useful for non-interactive scripts and CI/CD pipelines + +```bash +# Default behavior (two-stage Ctrl+C) +bssh -C production "long-running-command" +# Ctrl+C once: shows status +# Ctrl+C again (within 1s): terminates + +# Batch mode (immediate termination) +bssh -C production -b "long-running-command" +# Ctrl+C once: immediately terminates all jobs + +# Useful for automation +bssh -H nodes --batch --stream "deployment-script.sh" +``` + ### Built-in Commands ```bash # Test connectivity to hosts diff --git a/src/app/dispatcher.rs b/src/app/dispatcher.rs index e7990d8b..41d161a5 100644 --- a/src/app/dispatcher.rs +++ b/src/app/dispatcher.rs @@ -403,6 +403,7 @@ async fn handle_exec_command(cli: &Cli, ctx: &AppContext, command: &str) -> Resu require_all_success: cli.require_all_success, check_all_nodes: cli.check_all_nodes, sudo_password, + batch: cli.batch, }; execute_command(params).await } diff --git a/src/cli.rs b/src/cli.rs index ef1f23e7..3a0219f5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -23,7 +23,7 @@ use std::path::PathBuf; before_help = "\n\nBroadcast SSH - Parallel command execution across cluster nodes", about = "Broadcast SSH - SSH-compatible parallel command execution tool", long_about = "bssh is a high-performance SSH client with parallel execution capabilities.\nIt can be used as a drop-in replacement for SSH (single host) or as a powerful cluster management tool (multiple hosts).\n\nThe tool provides secure file transfer using SFTP and supports SSH keys, SSH agent, and password authentication.\nIt automatically detects Backend.AI multi-node session environments.\n\nOutput Modes:\n- TUI Mode (default): Interactive terminal UI with real-time monitoring (auto-enabled in terminals)\n- Stream Mode (--stream): Real-time output with [node] prefixes\n- File Mode (--output-dir): Save per-node output to timestamped files\n- Normal Mode: Traditional output after all nodes complete\n\nSSH Configuration Support:\n- Reads standard SSH config files (defaulting to ~/.ssh/config)\n- Supports Host patterns, HostName, User, Port, IdentityFile, StrictHostKeyChecking\n- ProxyJump, and many other SSH configuration directives\n- CLI arguments override SSH config values following SSH precedence rules", - after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" + after_help = "EXAMPLES:\n SSH Mode:\n bssh user@host # Interactive shell\n bssh admin@server.com \"uptime\" # Execute command\n bssh -p 2222 -i ~/.ssh/key user@host # Custom port and key\n bssh -F ~/.ssh/myconfig webserver # Use custom SSH config\n\n Port Forwarding:\n bssh -L 8080:example.com:80 user@host # Local forward: localhost:8080 → example.com:80\n bssh -R 8080:localhost:80 user@host # Remote forward: remote:8080 → localhost:80\n bssh -D 1080 user@host # SOCKS5 proxy on localhost:1080\n bssh -L 3306:db:3306 -R 80:web:80 user@host # Multiple forwards\n bssh -D *:1080/4 user@host # SOCKS4 proxy on all interfaces\n\n Multi-Server Mode:\n bssh -C production \"systemctl status\" # Execute on cluster (TUI mode auto-enabled)\n bssh -H \"web1,web2,web3\" \"df -h\" # Execute on multiple hosts\n bssh -H \"web1,web2,web3\" -f \"web1\" \"df -h\" # Filter to web1 only\n bssh -C production -f \"web*\" \"uptime\" # Filter cluster nodes\n bssh --parallel 20 -H web* \"apt update\" # Increase parallelism\n\n Host Exclusion (--exclude):\n bssh -H \"node1,node2,node3\" --exclude \"node2\" \"uptime\" # Exclude single host\n bssh -C production --exclude \"web1,web2\" \"apt update\" # Exclude multiple hosts\n bssh -C production --exclude \"db*\" \"systemctl restart\" # Exclude with wildcard pattern\n bssh -C production --exclude \"*-backup\" \"df -h\" # Exclude backup nodes\n\n Output Modes:\n bssh -C prod \"apt-get update\" # TUI mode (default, interactive monitoring)\n bssh -C prod --stream \"tail -f log\" # Stream mode (real-time with [node] prefixes)\n bssh -C prod --output-dir ./logs \"ps\" # File mode (save to timestamped files)\n bssh -C prod \"uptime\" | tee log.txt # Normal mode (auto-detected when piped)\n\n Batch Mode (Ctrl+C Handling):\n bssh -C prod \"long-running-command\" # Default: first Ctrl+C shows status, second terminates\n bssh -C prod -b \"long-command\" # Batch mode: single Ctrl+C terminates immediately\n bssh -H nodes --batch --stream \"cmd\" # Useful for CI/CD and non-interactive scripts\n\n TUI Mode Controls (when in TUI):\n 1-9 Jump to node detail view\n s Enter split view (2-4 nodes)\n d Enter diff view (compare nodes)\n f Toggle auto-scroll\n ↑/↓ Scroll output\n ←/→ Switch nodes\n Esc Return to summary\n ? Show help\n q Quit\n\n File Operations:\n bssh -C staging upload file.txt /tmp/ # Upload to cluster\n bssh -H host1,host2 download /etc/hosts ./backups/\n\n Other Commands:\n bssh list # List configured clusters\n bssh -C production ping # Test connectivity\n bssh -H hosts interactive # Interactive mode\n\n SSH Config Example (~/.ssh/config):\n Host web*\n HostName web.example.com\n User webuser\n Port 2222\n IdentityFile ~/.ssh/web_key\n StrictHostKeyChecking yes\n\nDeveloped and maintained as part of the Backend.AI project.\nFor more information: https://github.com/lablup/bssh" )] pub struct Cli { /// SSH destination in format: [user@]hostname[:port] or ssh://[user@]hostname[:port] @@ -104,6 +104,13 @@ pub struct Cli { )] pub sudo_password: bool, + #[arg( + short = 'b', + long = "batch", + help = "Batch mode: single Ctrl+C immediately terminates all jobs\nDisables two-stage Ctrl+C handling (status display on first press)\nUseful for non-interactive scripts and CI/CD pipelines\nNote: TUI mode has its own quit handling (q or Ctrl+C) and ignores this flag" + )] + pub batch: bool, + #[arg( short = 'J', long = "jump-host", diff --git a/src/commands/exec.rs b/src/commands/exec.rs index 63fff6c6..7512f63f 100644 --- a/src/commands/exec.rs +++ b/src/commands/exec.rs @@ -45,6 +45,7 @@ pub struct ExecuteCommandParams<'a> { pub require_all_success: bool, pub check_all_nodes: bool, pub sudo_password: Option>, + pub batch: bool, } pub async fn execute_command(params: ExecuteCommandParams<'_>) -> Result<()> { @@ -174,6 +175,7 @@ async fn execute_command_with_forwarding(params: ExecuteCommandParams<'_>) -> Re // Execute the actual command let result = execute_command_without_forwarding(ExecuteCommandParams { port_forwards: None, // Remove forwarding from params to avoid recursion + batch: params.batch, ..params }) .await; @@ -209,7 +211,8 @@ async fn execute_command_without_forwarding(params: ExecuteCommandParams<'_>) -> .with_timeout(params.timeout) .with_connect_timeout(params.connect_timeout) .with_jump_hosts(params.jump_hosts.map(|s| s.to_string())) - .with_sudo_password(params.sudo_password); + .with_sudo_password(params.sudo_password) + .with_batch_mode(params.batch); // Set keychain usage if on macOS #[cfg(target_os = "macos")] diff --git a/src/executor/parallel.rs b/src/executor/parallel.rs index 8a068a56..df776165 100644 --- a/src/executor/parallel.rs +++ b/src/executor/parallel.rs @@ -47,6 +47,7 @@ pub struct ParallelExecutor { pub(crate) connect_timeout: Option, pub(crate) jump_hosts: Option, pub(crate) sudo_password: Option>, + pub(crate) batch: bool, } impl ParallelExecutor { @@ -80,6 +81,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -104,6 +106,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -129,6 +132,7 @@ impl ParallelExecutor { connect_timeout: None, jump_hosts: None, sudo_password: None, + batch: false, } } @@ -166,8 +170,20 @@ impl ParallelExecutor { self } + /// Set batch mode for signal handling. + /// + /// When set to true, a single Ctrl+C will immediately terminate all jobs. + /// When false (default), first Ctrl+C shows status, second terminates. + pub fn with_batch_mode(mut self, batch: bool) -> Self { + self.batch = batch; + self + } + /// Execute a command on all nodes in parallel. pub async fn execute(&self, command: &str) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -210,8 +226,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Upload a file to all nodes in parallel. @@ -220,6 +297,9 @@ impl ParallelExecutor { local_path: &Path, remote_path: &str, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -256,8 +336,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_upload_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all uploads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all uploads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_upload_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Download a file from all nodes in parallel. @@ -266,6 +407,9 @@ impl ParallelExecutor { remote_path: &str, local_dir: &Path, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -302,8 +446,69 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - self.collect_download_results(results) + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + return self.collect_download_results(results); + } + } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; + } } /// Download multiple files from all nodes. @@ -312,6 +517,9 @@ impl ParallelExecutor { remote_paths: Vec, local_dir: &Path, ) -> Result> { + use std::time::Duration; + use tokio::signal; + let semaphore = Arc::new(Semaphore::new(self.max_parallel)); let multi_progress = MultiProgress::new(); let style = create_progress_style()?; @@ -397,16 +605,77 @@ impl ParallelExecutor { }) .collect(); - let results = join_all(tasks).await; - - // Collect results for this file - for result in results { - match result { - Ok(download_result) => all_results.push(download_result), - Err(e) => { - tracing::error!("Task failed: {}", e); + // Handle signal interruption with batch mode support + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + let mut pending_handles = tasks; + + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Count pending handles as running + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all downloads..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = self.nodes.len() - running_count; + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + // Wait for all tasks to complete + results = join_all(pending_handles.iter_mut()) => { + // Collect results for this file + for result in results { + match result { + Ok(download_result) => all_results.push(download_result), + Err(e) => { + tracing::error!("Task failed: {}", e); + } + } + } + break; } } + + // Small sleep to avoid busy waiting + tokio::time::sleep(Duration::from_millis(50)).await; } } @@ -665,12 +934,82 @@ impl ParallelExecutor { ) -> Result> { use super::output_sync::NodeOutputWriter; use std::time::Duration; + use tokio::signal; let mut pending_handles = handles; let mut results = Vec::new(); + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; // Poll until all tasks complete - while !pending_handles.is_empty() || !manager.all_complete() { + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + _ = async { + if pending_handles.is_empty() && manager.all_complete() { + return; + } + + tokio::time::sleep(Duration::from_millis(1)).await; + } => { + // Check if all tasks are done + if pending_handles.is_empty() && manager.all_complete() { + break; + } + } + } + + // Continue with regular processing // Poll all streams for new output manager.poll_all(); @@ -774,7 +1113,7 @@ impl ParallelExecutor { // Run TUI event loop - this will block until user quits or all complete // The TUI itself will handle polling the manager - let user_quit = match tui::run_tui(manager, cluster_name, command).await { + let user_quit = match tui::run_tui(manager, cluster_name, command, self.batch).await { Ok(tui::TuiExitReason::UserQuit) => true, Ok(tui::TuiExitReason::AllTasksCompleted) => false, Err(e) => { @@ -887,14 +1226,84 @@ impl ParallelExecutor { let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S"); let mut pending_handles = handles; + let mut first_ctrl_c = false; + let mut ctrl_c_time: Option = None; + + // Poll until all tasks complete with signal handling + loop { + tokio::select! { + // Handle Ctrl+C signal + _ = tokio::signal::ctrl_c() => { + if self.batch { + // Batch mode: immediately terminate on first Ctrl+C + eprintln!("\nReceived Ctrl+C (batch mode). Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Non-batch mode: two-stage Ctrl+C + if !first_ctrl_c { + // First Ctrl+C: show status + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } else { + // Second Ctrl+C: terminate + if let Some(first_time) = ctrl_c_time { + if first_time.elapsed() <= Duration::from_secs(1) { + eprintln!("Received second Ctrl+C. Terminating all jobs..."); + for handle in pending_handles.drain(..) { + handle.abort(); + } + // Exit with SIGINT exit code (130) + std::process::exit(130); + } else { + // Too much time passed, reset + first_ctrl_c = true; + ctrl_c_time = Some(std::time::Instant::now()); + eprintln!("\nReceived Ctrl+C. Press Ctrl+C again within 1 second to terminate."); + + // Show current status (matching behavior of first Ctrl+C) + let running_count = pending_handles.len(); + let completed_count = manager.streams().iter() + .filter(|s| matches!(s.status(), super::stream_manager::ExecutionStatus::Completed)) + .count(); + eprintln!("Status: {} running, {} completed", running_count, completed_count); + } + } + } + } + } + _ = async { + if pending_handles.is_empty() && manager.all_complete() { + return; + } - // Poll until all tasks complete - while !pending_handles.is_empty() || !manager.all_complete() { + tokio::time::sleep(Duration::from_millis(1)).await; + } => { + // Check if all tasks are done + if pending_handles.is_empty() && manager.all_complete() { + break; + } + } + } + + // Continue with regular processing manager.poll_all(); // Check for completed tasks pending_handles.retain_mut(|handle| !handle.is_finished()); + // Small sleep to avoid busy waiting tokio::time::sleep(Duration::from_millis(50)).await; } diff --git a/src/ui/tui/mod.rs b/src/ui/tui/mod.rs index c5e1f1dd..859fef52 100644 --- a/src/ui/tui/mod.rs +++ b/src/ui/tui/mod.rs @@ -52,6 +52,7 @@ pub async fn run_tui( manager: &mut MultiNodeStreamManager, cluster_name: &str, command: &str, + _batch_mode: bool, // Reserved for future use; TUI has its own quit handling ) -> Result { // Setup terminal with automatic cleanup guard let _terminal_guard = TerminalGuard::new()?;