Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 0 additions & 85 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,91 +337,6 @@ let tasks: Vec<JoinHandle<Result<ExecutionResult>>> = nodes
- Uses system known_hosts file (~/.ssh/known_hosts)
- SSH agent authentication with auto-detection

### 4.0.1 Command Output Streaming Infrastructure

**Status:** Implemented (2025-10-29) as part of Phase 1 of Issue #68

**Design Motivation:**
Real-time command output streaming enables future UI features such as live progress bars, per-node output display, and streaming aggregation. The infrastructure provides the foundation for responsive UIs while maintaining full backward compatibility with existing synchronous APIs.

**Architecture:**

The streaming infrastructure consists of three key components:

1. **CommandOutput Enum** (`tokio_client/channel_manager.rs`)
```rust
pub enum CommandOutput {
StdOut(CryptoVec),
StdErr(CryptoVec),
}
```
- Represents streaming output events
- Separates stdout and stderr streams
- Uses russh's `CryptoVec` for zero-copy efficiency

2. **CommandOutputBuffer** (`tokio_client/channel_manager.rs`)
```rust
pub(crate) struct CommandOutputBuffer {
sender: Sender<CommandOutput>,
receiver_task: JoinHandle<(Vec<u8>, Vec<u8>)>,
}
```
- Internal buffer for collecting streaming output
- Background task aggregates stdout and stderr
- Channel capacity: 100 events (tunable)
- Used by synchronous `execute()` for backward compatibility

3. **Streaming API Methods**
- `Client::execute_streaming(command, sender)` - Low-level streaming API
- `SshClient::connect_and_execute_with_output_streaming()` - High-level streaming API
- Both respect timeout settings and handle errors consistently

**Implementation Pattern:**

```rust
// Streaming execution (new in Phase 1)
let (sender, receiver_task) = build_output_buffer();
let exit_status = client.execute_streaming("command", sender).await?;
let (stdout, stderr) = receiver_task.await?;

// Backward-compatible execution (refactored to use streaming)
let result = client.execute("command").await?;
// Internally uses execute_streaming() + CommandOutputBuffer
```

**Backward Compatibility:**

The existing `execute()` method was refactored to use `execute_streaming()` internally:
- Same function signature
- Same return type (`CommandExecutedResult`)
- Same error handling behavior
- Same timeout behavior
- Zero breaking changes to existing code

**Performance Characteristics:**
- Channel-based architecture with bounded buffer (100 events)
- Zero-copy transfer of SSH channel data via `CryptoVec`
- Background task for output aggregation (non-blocking)
- Memory overhead: ~16KB per streaming command (8KB stdout + 1KB stderr + buffer)
- Latency: Real-time streaming with minimal buffering delay

**Error Handling:**
- New `JoinError` variant in `tokio_client::Error`
- Handles task join failures gracefully
- Timeout handling preserved from original implementation
- Channel send errors handled silently (receiver may be dropped)

**Testing:**
- Integration tests cover streaming with stdout/stderr separation
- Backward compatibility test ensures no behavioral changes
- Tests use localhost SSH for reproducible validation
- All existing tests pass with zero modifications

**Future Phases (Issue #68):**
- Phase 2: Executor integration for parallel streaming
- Phase 3: UI components (progress bars, live updates)
- Phase 4: Advanced features (filtering, aggregation)

### 4.1 Authentication Module (`ssh/auth.rs`)

**Status:** Implemented (2025-10-17) as part of code deduplication refactoring (Issue #34)
Expand Down
106 changes: 0 additions & 106 deletions src/ssh/client/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ use super::config::ConnectionConfig;
use super::core::SshClient;
use super::result::CommandResult;
use crate::ssh::known_hosts::StrictHostKeyChecking;
use crate::ssh::tokio_client::CommandOutput;
use anyhow::{Context, Result};
use std::path::Path;
use std::time::Duration;
use tokio::sync::mpsc::Sender;

// SSH command execution timeout design:
// - 5 minutes (300s) handles long-running commands
Expand Down Expand Up @@ -162,108 +160,4 @@ impl SshClient {
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
}
}

/// Execute a command with streaming output support
///
/// This method provides real-time command output streaming through the provided sender channel.
/// Output is sent as `CommandOutput::StdOut` or `CommandOutput::StdErr` variants.
///
/// # Arguments
/// * `command` - The command to execute
/// * `config` - Connection configuration
/// * `output_sender` - Channel sender for streaming output
///
/// # Returns
/// The exit status of the command
pub async fn connect_and_execute_with_output_streaming(
&mut self,
command: &str,
config: &ConnectionConfig<'_>,
output_sender: Sender<CommandOutput>,
) -> Result<u32> {
tracing::debug!("Connecting to {}:{}", self.host, self.port);

// Determine authentication method based on parameters
let auth_method = self
.determine_auth_method(
config.key_path,
config.use_agent,
config.use_password,
#[cfg(target_os = "macos")]
config.use_keychain,
)
.await?;

let strict_mode = config
.strict_mode
.unwrap_or(StrictHostKeyChecking::AcceptNew);

// Create client connection - either direct or through jump hosts
let client = self
.establish_connection(
&auth_method,
strict_mode,
config.jump_hosts_spec,
config.key_path,
config.use_agent,
config.use_password,
)
.await?;

tracing::debug!("Connected and authenticated successfully");
tracing::debug!("Executing command with streaming: {}", command);

// Execute command with streaming and timeout
let exit_status = self
.execute_streaming_with_timeout(&client, command, config.timeout_seconds, output_sender)
.await?;

tracing::debug!("Command execution completed with status: {}", exit_status);

Ok(exit_status)
}

/// Execute a command with streaming output and the specified timeout
async fn execute_streaming_with_timeout(
&self,
client: &crate::ssh::tokio_client::Client,
command: &str,
timeout_seconds: Option<u64>,
output_sender: Sender<CommandOutput>,
) -> Result<u32> {
if let Some(timeout_secs) = timeout_seconds {
if timeout_secs == 0 {
// No timeout (unlimited)
tracing::debug!("Executing command with streaming, no timeout (unlimited)");
client.execute_streaming(command, output_sender)
.await
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
} else {
// With timeout
let command_timeout = Duration::from_secs(timeout_secs);
tracing::debug!(
"Executing command with streaming, timeout of {} seconds",
timeout_secs
);
tokio::time::timeout(
command_timeout,
client.execute_streaming(command, output_sender)
)
.await
.with_context(|| format!("Command execution timeout: The command '{}' did not complete within {} seconds on {}:{}", command, timeout_secs, self.host, self.port))?
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
}
} else {
// Default timeout if not specified
let command_timeout = Duration::from_secs(DEFAULT_COMMAND_TIMEOUT_SECS);
tracing::debug!("Executing command with streaming, default timeout of 300 seconds");
tokio::time::timeout(
command_timeout,
client.execute_streaming(command, output_sender)
)
.await
.with_context(|| format!("Command execution timeout: The command '{}' did not complete within 5 minutes on {}:{}", command, self.host, self.port))?
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
}
}
}
Loading
Loading