Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 171 additions & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,180 @@ The existing `execute()` method was refactored to use `execute_streaming()` inte
- All existing tests pass with zero modifications

**Future Phases (Issue #68):**
- Phase 2: Executor integration for parallel streaming
- ~~Phase 2: Executor integration for parallel streaming~~ ✓ Completed (2025-10-29)
- Phase 3: UI components (progress bars, live updates)
- Phase 4: Advanced features (filtering, aggregation)

### 4.0.2 Multi-Node Stream Management and Output Modes (Phase 2)

**Status:** Implemented (2025-10-29) as part of Phase 2 of Issue #68

**Design Motivation:**
Building on Phase 1's streaming infrastructure, Phase 2 adds independent stream management for multiple nodes and flexible output modes. This enables real-time monitoring of parallel command execution across clusters while maintaining full backward compatibility.

**Architecture:**

The Phase 2 implementation consists of four key components:

1. **NodeStream** (`executor/stream_manager.rs`)
```rust
pub struct NodeStream {
pub node: Node,
receiver: mpsc::Receiver<CommandOutput>,
stdout_buffer: Vec<u8>,
stderr_buffer: Vec<u8>,
status: ExecutionStatus,
exit_code: Option<u32>,
closed: bool,
}
```
- Independent output stream for each node
- Non-blocking polling of command output
- Separate buffers for stdout and stderr
- Tracks execution status and exit codes
- Can consume buffers incrementally for streaming

2. **MultiNodeStreamManager** (`executor/stream_manager.rs`)
```rust
pub struct MultiNodeStreamManager {
streams: Vec<NodeStream>,
}
```
- Coordinates multiple node streams
- Non-blocking poll of all streams
- Tracks completion status
- Provides access to all stream states

3. **OutputMode** (`executor/output_mode.rs`)
```rust
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum OutputMode {
#[default]
Normal, // Traditional batch mode
Stream, // Real-time with [node] prefixes
File(PathBuf), // Save to per-node files
}
```
- Three distinct output modes
- TTY detection for automatic mode selection
- Priority: `--output-dir` > `--stream` > default

4. **CLI Integration** (`cli.rs`)
- `--stream` flag: Enable real-time streaming output
- `--output-dir <DIR>`: Save per-node output to files
- Auto-detection of non-TTY environments (pipes, CI)

**Implementation Details:**

**Streaming Execution Flow:**
```rust
// In ParallelExecutor::execute_with_streaming()
1. Create MultiNodeStreamManager
2. Spawn task per node with streaming sender
3. Poll all streams in loop:
- Extract new output from each stream
- Process based on output mode:
* Stream: Print with [node] prefix
* File: Buffer until completion
* Normal: Use traditional execute()
4. Wait for all tasks to complete
5. Collect and return ExecutionResults
```

**Stream Mode Output:**
```
[host1] Starting process...
[host2] Starting process...
[host1] Processing data...
[host2] Processing data...
[host1] Complete
[host2] Complete
```

**File Mode Output:**
```
Output directory: ./results/
host1_20251029_143022.stdout
host1_20251029_143022.stderr
host2_20251029_143022.stdout
host2_20251029_143022.stderr
```

**Backward Compatibility:**

Phase 2 maintains full backward compatibility:
- Without `--stream` or `--output-dir`, uses traditional `execute()` method
- Existing CLI behavior unchanged
- All 396 existing tests pass without modification
- Exit code strategy and error handling preserved

**Performance Characteristics:**
- **Stream Mode:**
- 50ms polling interval for smooth output
- Minimal memory: only buffered lines in flight
- Real-time latency: <100ms from node to display

- **File Mode:**
- Buffers entire output in memory
- Async file writes (non-blocking)
- Timestamped filenames prevent collisions

**TTY Detection:**
- Auto-detects piped output (`stdout.is_terminal()`)
- Checks CI environment variables (CI, GITHUB_ACTIONS, etc.)
- Respects NO_COLOR convention
- Falls back gracefully when colors unavailable

**Error Handling:**
- Per-node failure tracking with ExecutionStatus
- Failed nodes still report in stream/file modes
- Exit code calculation respects user-specified strategy
- Graceful handling of channel closures

**Testing:**
- 10 unit tests for stream management
- 3 unit tests for output mode selection
- TTY detection tests
- All existing integration tests pass
- Total test coverage: 396 tests passing

**Code Organization:**
```
src/executor/
├── stream_manager.rs # NodeStream, MultiNodeStreamManager (252 lines)
├── output_mode.rs # OutputMode enum, TTY detection (171 lines)
├── parallel.rs # Updated with streaming methods (+264 lines)
└── mod.rs # Exports for new types
```

**Usage Examples:**

**Stream Mode:**
```bash
# Real-time streaming output
bssh -C production --stream "tail -f /var/log/app.log"

# With filtering
bssh -H "web*" --stream "systemctl status nginx"
```

**File Mode:**
```bash
# Save outputs to directory
bssh -C cluster --output-dir ./results "ps aux"

# Each node gets separate files with timestamps
ls ./results/
# web1_20251029_143022.stdout
# web2_20251029_143022.stdout
```

**Future Enhancements:**
- Phase 3: UI components (progress bars, spinners)
- Phase 4: Advanced filtering and aggregation
- Potential: Colored output per node
- Potential: Interactive stream control (pause/resume)

### 4.1 Authentication Module (`ssh/auth.rs`)

**Status:** Implemented (2025-10-17) as part of code deduplication refactoring (Issue #34)
Expand Down
1 change: 1 addition & 0 deletions src/app/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ async fn handle_exec_command(cli: &Cli, ctx: &AppContext, command: &str) -> Resu
#[cfg(target_os = "macos")]
use_keychain,
output_dir: cli.output_dir.as_deref(),
stream: cli.stream,
timeout,
jump_hosts: cli.jump_hosts.as_deref(),
port_forwards: if cli.has_port_forwards() {
Expand Down
6 changes: 6 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ pub struct Cli {
)]
pub port: Option<u16>,

#[arg(
long,
help = "Stream output in real-time with [node] prefixes\nEach line of output is prefixed with the node hostname and displayed as it arrives.\nUseful for monitoring long-running commands across multiple nodes.\nAutomatically disabled when output is piped or in CI environments."
)]
pub stream: bool,

#[arg(
long,
help = "Output directory for per-node command results\nCreates timestamped files:\n - hostname_TIMESTAMP.stdout (command output)\n - hostname_TIMESTAMP.stderr (error output)\n - hostname_TIMESTAMP.error (connection failures)\n - summary_TIMESTAMP.txt (execution summary)"
Expand Down
34 changes: 27 additions & 7 deletions src/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use anyhow::Result;
use std::path::Path;

use crate::executor::{ExitCodeStrategy, ParallelExecutor, RankDetector};
use crate::executor::{ExitCodeStrategy, OutputMode, ParallelExecutor, RankDetector};
use crate::forwarding::ForwardingType;
use crate::node::Node;
use crate::ssh::known_hosts::StrictHostKeyChecking;
Expand All @@ -34,6 +34,7 @@ pub struct ExecuteCommandParams<'a> {
#[cfg(target_os = "macos")]
pub use_keychain: bool,
pub output_dir: Option<&'a Path>,
pub stream: bool,
pub timeout: Option<u64>,
pub jump_hosts: Option<&'a str>,
pub port_forwards: Option<Vec<ForwardingType>>,
Expand Down Expand Up @@ -207,16 +208,35 @@ async fn execute_command_without_forwarding(params: ExecuteCommandParams<'_>) ->
#[cfg(target_os = "macos")]
let executor = executor.with_keychain(params.use_keychain);

let results = executor.execute(params.command).await?;
// Determine output mode
let output_mode =
OutputMode::from_args(params.stream, params.output_dir.map(|p| p.to_path_buf()));

// Save outputs to files if output_dir is specified
// Execute with appropriate mode
let results = if output_mode.is_normal() {
// Use traditional execution for backward compatibility
executor.execute(params.command).await?
} else {
// Use streaming execution for --stream or --output-dir
executor
.execute_with_streaming(params.command, output_mode.clone())
.await?
};

// Save outputs to files if output_dir is specified and not already handled by file mode
// (File mode already saves outputs, so only save for normal mode with output_dir)
if let Some(dir) = params.output_dir {
save_outputs_to_files(&results, dir, params.command).await?;
if !params.stream {
// Only save if not in stream mode (file mode saves automatically)
save_outputs_to_files(&results, dir, params.command).await?;
}
}

// Print results
for result in &results {
result.print_output(params.verbose);
// Print results (skip if already printed in stream mode)
if !params.stream {
for result in &results {
result.print_output(params.verbose);
}
}

// Print summary
Expand Down
5 changes: 5 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@

mod connection_manager;
mod execution_strategy;
mod output_mode;
mod output_sync;
mod parallel;
mod result_types;
mod stream_manager;

pub mod exit_strategy;
pub mod rank_detector;

// Re-export public types
pub use connection_manager::download_dir_from_node;
pub use exit_strategy::ExitCodeStrategy;
pub use output_mode::{is_tty, should_use_colors, OutputMode};
pub use parallel::ParallelExecutor;
pub use rank_detector::RankDetector;
pub use result_types::{DownloadResult, ExecutionResult, UploadResult};
pub use stream_manager::{ExecutionStatus, MultiNodeStreamManager, NodeStream};
Loading