diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 44f78123..abdbb9f5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -213,6 +213,64 @@ Security features for the SSH server (`src/server/security/`): - Thread-safe with fail-closed behavior on lock contention - Configuration via `allowed_ips` and `blocked_ips` in server config +### Audit Logging Module + +Comprehensive audit logging infrastructure for the SSH server (`src/server/audit/`): + +**Structure**: +- `mod.rs` - `AuditManager` for collecting and distributing audit events +- `event.rs` - `AuditEvent` type definitions and builder pattern +- `exporter.rs` - `AuditExporter` trait and `NullExporter` implementation + +**Key Components**: + +- **AuditEvent**: Represents discrete auditable actions with fields for: + - Unique event ID (UUID v4) + - Timestamp (UTC) + - Event type, session ID, username, client IP + - File paths, bytes transferred, operation result + - Protocol and additional details + +- **EventType**: Categorizes security and operational events: + - Authentication: `AuthSuccess`, `AuthFailure`, `AuthRateLimited` + - Sessions: `SessionStart`, `SessionEnd` + - Commands: `CommandExecuted`, `CommandBlocked` + - File operations: `FileOpenRead`, `FileOpenWrite`, `FileRead`, `FileWrite`, `FileClose`, `FileUploaded`, `FileDownloaded`, `FileDeleted`, `FileRenamed` + - Directory operations: `DirectoryCreated`, `DirectoryDeleted`, `DirectoryListed` + - Filters: `TransferDenied`, `TransferAllowed` + - Security: `IpBlocked`, `IpUnblocked`, `SuspiciousActivity` + +- **EventResult**: Operation outcomes (`Success`, `Failure`, `Denied`, `Error`) + +- **AuditExporter Trait**: Interface for audit event destinations + - `export()` - Export single event + - `export_batch()` - Export multiple events (optimizable) + - `flush()` - Ensure pending events are written + - `close()` - Clean up resources + +- **NullExporter**: No-op exporter for testing and disabled audit logging + +- **AuditManager**: Central manager with async processing + - Background worker for non-blocking event processing + - Configurable buffering (buffer size, batch size) + - Periodic flush intervals + - Multiple exporter support + - Graceful shutdown with event flush + +**Configuration**: +```rust +let config = AuditConfig::new() + .with_enabled(true) + .with_buffer_size(1000) + .with_batch_size(100) + .with_flush_interval(5); +``` + +**Future Exporters** (planned): +- File exporter for local audit logs +- OpenTelemetry exporter for distributed tracing +- Logstash exporter for centralized logging + ### Server CLI Binary **Binary**: `bssh-server` @@ -274,6 +332,7 @@ SSH server implementation using the russh library for accepting incoming connect - `exec.rs` - Command execution for SSH exec requests - `sftp.rs` - SFTP subsystem handler with path traversal prevention - `auth/` - Authentication provider infrastructure +- `audit/` - Audit logging infrastructure (event types, exporters, manager) **Key Components**: diff --git a/Cargo.lock b/Cargo.lock index ecca1f76..95e4a96c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,6 +419,7 @@ dependencies = [ "secrecy", "security-framework", "serde", + "serde_json", "serde_yaml", "serial_test", "shell-words", @@ -594,6 +595,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] diff --git a/Cargo.toml b/Cargo.toml index 0e4320c5..a0f6d34f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ indicatif = "0.18.3" rpassword = "7.4.0" directories = "6.0.0" dirs = "6.0" -chrono = "0.4.42" +chrono = { version = "0.4.42", features = ["serde"] } glob = "0.3.3" whoami = "2.0.1" owo-colors = "4.2.3" @@ -80,6 +80,7 @@ serial_test = "3.2" insta = "1.44" criterion = { version = "0.8", features = ["html_reports"] } mockall = "0.14" +serde_json = "1.0" [[bench]] name = "large_output_benchmark" diff --git a/docs/architecture/README.md b/docs/architecture/README.md index eabf3c25..19e5c812 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -37,6 +37,7 @@ bssh is a high-performance parallel SSH command execution tool with SSH-compatib - **SSH Server Module** - SSH server implementation using russh (see main ARCHITECTURE.md) - **Server Authentication** - Authentication providers including public key verification (see main ARCHITECTURE.md) - **SFTP Handler** - SFTP subsystem with path traversal prevention and chroot-like isolation (see main ARCHITECTURE.md) +- **Audit Logging** - Audit event types, exporters, and async event processing (see main ARCHITECTURE.md) ## Navigation @@ -83,7 +84,7 @@ src/ ├── interactive/ → Interactive Mode ├── jump/ → Jump Host Support ├── forward/ → Port Forwarding -├── server/ → SSH Server (handler, session, config/, auth/) +├── server/ → SSH Server (handler, session, config/, auth/, audit/) ├── shared/ → Shared utilities (validation, rate limiting, auth types, errors) ├── security/ → Security utilities (re-exports from shared for compatibility) └── commands/ → Command Implementations diff --git a/src/server/audit/event.rs b/src/server/audit/event.rs new file mode 100644 index 00000000..2a81ff07 --- /dev/null +++ b/src/server/audit/event.rs @@ -0,0 +1,351 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Audit event types for logging security and operational events. +//! +//! This module defines the core audit event types used throughout the SSH server +//! to track authentication, file operations, and other security-relevant activities. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::net::IpAddr; +use std::path::PathBuf; + +/// Audit event for logging security and operational events. +/// +/// Each audit event represents a single discrete action or occurrence +/// that should be tracked for compliance, security monitoring, or debugging. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuditEvent { + /// Unique event identifier + pub id: String, + + /// Timestamp when the event occurred + pub timestamp: DateTime, + + /// Type of event + pub event_type: EventType, + + /// Session ID associated with this event + pub session_id: String, + + /// Username associated with this event + pub user: String, + + /// Client IP address (if available) + pub client_ip: Option, + + /// File path for file operations + pub path: Option, + + /// Destination path for rename/copy operations + pub dest_path: Option, + + /// Number of bytes transferred + pub bytes: Option, + + /// Result of the operation + pub result: EventResult, + + /// Additional details about the event + /// + /// # Security Warning + /// + /// This field may contain sensitive information such as: + /// - Error messages with file paths or system information + /// - Command arguments that may include passwords or tokens + /// - User-supplied data that hasn't been sanitized + /// + /// When implementing exporters, ensure this field is handled securely: + /// - Apply appropriate access controls to audit logs + /// - Consider redacting or filtering sensitive patterns + /// - Use encryption when transmitting over networks + /// - Comply with data retention and privacy policies + pub details: Option, + + /// Protocol used (ssh, sftp, scp) + pub protocol: Option, +} + +/// Type of audit event. +/// +/// This enum categorizes different types of security and operational events +/// that can occur in the SSH server. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum EventType { + // Authentication events + /// Successful authentication + AuthSuccess, + /// Failed authentication attempt + AuthFailure, + /// Authentication rate limited + AuthRateLimited, + + // Session events + /// Session started + SessionStart, + /// Session ended + SessionEnd, + + // Command execution + /// Command executed + CommandExecuted, + /// Command blocked by policy + CommandBlocked, + + // File operations + /// File opened for reading + FileOpenRead, + /// File opened for writing + FileOpenWrite, + /// File read operation + FileRead, + /// File write operation + FileWrite, + /// File closed + FileClose, + /// File uploaded + FileUploaded, + /// File downloaded + FileDownloaded, + /// File deleted + FileDeleted, + /// File renamed + FileRenamed, + + // Directory operations + /// Directory created + DirectoryCreated, + /// Directory deleted + DirectoryDeleted, + /// Directory listed + DirectoryListed, + + // Filter events + /// Transfer denied by filter + TransferDenied, + /// Transfer allowed + TransferAllowed, + + // Security events + /// IP address blocked + IpBlocked, + /// IP address unblocked + IpUnblocked, + /// Suspicious activity detected + SuspiciousActivity, +} + +/// Result of an audit event. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum EventResult { + /// Operation succeeded + Success, + /// Operation failed + Failure, + /// Operation denied by policy + Denied, + /// Operation resulted in error + Error, +} + +impl AuditEvent { + /// Create a new audit event with the minimum required fields. + /// + /// # Arguments + /// + /// * `event_type` - Type of event + /// * `user` - Username associated with the event + /// * `session_id` - Session ID + /// + /// # Example + /// + /// ``` + /// use bssh::server::audit::event::{AuditEvent, EventType}; + /// + /// let event = AuditEvent::new( + /// EventType::AuthSuccess, + /// "alice".to_string(), + /// "session-123".to_string(), + /// ); + /// ``` + pub fn new(event_type: EventType, user: String, session_id: String) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + timestamp: Utc::now(), + event_type, + session_id, + user, + client_ip: None, + path: None, + dest_path: None, + bytes: None, + result: EventResult::Success, + details: None, + protocol: None, + } + } + + /// Set the client IP address. + pub fn with_client_ip(mut self, ip: IpAddr) -> Self { + self.client_ip = Some(ip); + self + } + + /// Set the file path. + pub fn with_path(mut self, path: PathBuf) -> Self { + self.path = Some(path); + self + } + + /// Set the destination path (for rename/copy operations). + pub fn with_dest_path(mut self, dest_path: PathBuf) -> Self { + self.dest_path = Some(dest_path); + self + } + + /// Set the number of bytes transferred. + pub fn with_bytes(mut self, bytes: u64) -> Self { + self.bytes = Some(bytes); + self + } + + /// Set the operation result. + pub fn with_result(mut self, result: EventResult) -> Self { + self.result = result; + self + } + + /// Set additional details. + /// + /// # Security Warning + /// + /// Be cautious when including sensitive information in this field. + /// See the `details` field documentation for security considerations. + pub fn with_details(mut self, details: String) -> Self { + self.details = Some(details); + self + } + + /// Set the protocol. + pub fn with_protocol(mut self, protocol: &str) -> Self { + self.protocol = Some(protocol.to_string()); + self + } +} + +// Note: Default implementation removed as it creates sentinel values with empty +// user and session_id fields, which are semantically invalid for audit events. +// Use AuditEvent::new() to create audit events with meaningful values. + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_audit_event_creation() { + let event = AuditEvent::new( + EventType::AuthSuccess, + "alice".to_string(), + "session-123".to_string(), + ); + + assert_eq!(event.event_type, EventType::AuthSuccess); + assert_eq!(event.user, "alice"); + assert_eq!(event.session_id, "session-123"); + assert_eq!(event.result, EventResult::Success); + assert!(event.client_ip.is_none()); + assert!(event.path.is_none()); + assert!(!event.id.is_empty()); + } + + #[test] + fn test_audit_event_builder() { + let ip: IpAddr = "192.168.1.100".parse().unwrap(); + let event = AuditEvent::new( + EventType::FileUploaded, + "bob".to_string(), + "session-456".to_string(), + ) + .with_client_ip(ip) + .with_path(PathBuf::from("/home/bob/file.txt")) + .with_bytes(1024) + .with_result(EventResult::Success) + .with_protocol("sftp") + .with_details("Upload completed".to_string()); + + assert_eq!(event.client_ip, Some(ip)); + assert_eq!(event.path, Some(PathBuf::from("/home/bob/file.txt"))); + assert_eq!(event.bytes, Some(1024)); + assert_eq!(event.result, EventResult::Success); + assert_eq!(event.protocol, Some("sftp".to_string())); + assert_eq!(event.details, Some("Upload completed".to_string())); + } + + #[test] + fn test_event_type_serialization() { + let event_type = EventType::AuthSuccess; + let serialized = serde_json::to_string(&event_type).unwrap(); + assert_eq!(serialized, r#""auth_success""#); + + let deserialized: EventType = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, EventType::AuthSuccess); + } + + #[test] + fn test_event_result_serialization() { + let result = EventResult::Denied; + let serialized = serde_json::to_string(&result).unwrap(); + assert_eq!(serialized, r#""denied""#); + + let deserialized: EventResult = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, EventResult::Denied); + } + + #[test] + fn test_full_event_serialization() { + let event = AuditEvent::new( + EventType::SessionStart, + "charlie".to_string(), + "session-789".to_string(), + ) + .with_client_ip("10.0.0.1".parse().unwrap()) + .with_protocol("ssh"); + + let serialized = serde_json::to_string(&event).unwrap(); + let deserialized: AuditEvent = serde_json::from_str(&serialized).unwrap(); + + assert_eq!(deserialized.event_type, event.event_type); + assert_eq!(deserialized.user, event.user); + assert_eq!(deserialized.session_id, event.session_id); + assert_eq!(deserialized.client_ip, event.client_ip); + assert_eq!(deserialized.protocol, event.protocol); + } + + #[test] + fn test_event_with_dest_path() { + let event = AuditEvent::new( + EventType::FileRenamed, + "dave".to_string(), + "session-101".to_string(), + ) + .with_path(PathBuf::from("/home/dave/old.txt")) + .with_dest_path(PathBuf::from("/home/dave/new.txt")); + + assert_eq!(event.path, Some(PathBuf::from("/home/dave/old.txt"))); + assert_eq!(event.dest_path, Some(PathBuf::from("/home/dave/new.txt"))); + } +} diff --git a/src/server/audit/exporter.rs b/src/server/audit/exporter.rs new file mode 100644 index 00000000..e9cf2e4e --- /dev/null +++ b/src/server/audit/exporter.rs @@ -0,0 +1,194 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Audit event exporters for sending events to various destinations. +//! +//! This module defines the trait that all audit exporters must implement, +//! as well as built-in exporters like the null exporter. + +use super::event::AuditEvent; +use anyhow::Result; +use async_trait::async_trait; + +/// Trait for audit log exporters. +/// +/// Exporters are responsible for taking audit events and sending them +/// to their destination (file, network, etc.). Exporters must be thread-safe +/// and should handle errors gracefully. +#[async_trait] +pub trait AuditExporter: Send + Sync { + /// Export a single audit event. + /// + /// # Arguments + /// + /// * `event` - The audit event to export + /// + /// # Errors + /// + /// Returns an error if the event cannot be exported. + async fn export(&self, event: AuditEvent) -> Result<()>; + + /// Export multiple events in a batch. + /// + /// The default implementation calls `export()` for each event, + /// but exporters can override this for more efficient batch processing. + /// + /// # Arguments + /// + /// * `events` - Slice of audit events to export + /// + /// # Errors + /// + /// Returns an error if any event fails to export. + async fn export_batch(&self, events: &[AuditEvent]) -> Result<()> { + for event in events { + self.export(event.clone()).await?; + } + Ok(()) + } + + /// Flush any buffered events. + /// + /// This should ensure all pending events are written to their destination. + /// + /// # Errors + /// + /// Returns an error if the flush operation fails. + async fn flush(&self) -> Result<()>; + + /// Close the exporter and release resources. + /// + /// After calling close, no more events should be exported. + /// + /// # Errors + /// + /// Returns an error if cleanup fails. + async fn close(&self) -> Result<()>; +} + +/// Null exporter that discards all events. +/// +/// This is useful for testing or when audit logging is disabled. +/// All operations succeed immediately without doing any work. +#[derive(Debug, Clone, Default)] +pub struct NullExporter; + +impl NullExporter { + /// Create a new null exporter. + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl AuditExporter for NullExporter { + async fn export(&self, _event: AuditEvent) -> Result<()> { + // Discard the event + Ok(()) + } + + async fn export_batch(&self, _events: &[AuditEvent]) -> Result<()> { + // Discard all events + Ok(()) + } + + async fn flush(&self) -> Result<()> { + // Nothing to flush + Ok(()) + } + + async fn close(&self) -> Result<()> { + // Nothing to close + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::server::audit::event::{EventResult, EventType}; + + #[tokio::test] + async fn test_null_exporter_export() { + let exporter = NullExporter::new(); + let event = AuditEvent::new( + EventType::AuthSuccess, + "test".to_string(), + "session-1".to_string(), + ); + + let result = exporter.export(event).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_null_exporter_batch() { + let exporter = NullExporter::new(); + let events = vec![ + AuditEvent::new( + EventType::AuthSuccess, + "user1".to_string(), + "session-1".to_string(), + ), + AuditEvent::new( + EventType::AuthFailure, + "user2".to_string(), + "session-2".to_string(), + ) + .with_result(EventResult::Failure), + ]; + + let result = exporter.export_batch(&events).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_null_exporter_flush() { + let exporter = NullExporter::new(); + let result = exporter.flush().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_null_exporter_close() { + let exporter = NullExporter::new(); + let result = exporter.close().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_null_exporter_multiple_operations() { + let exporter = NullExporter::new(); + + // Export single event + let event1 = AuditEvent::new( + EventType::SessionStart, + "alice".to_string(), + "session-123".to_string(), + ); + exporter.export(event1).await.unwrap(); + + // Export batch + let events = vec![AuditEvent::new( + EventType::FileUploaded, + "bob".to_string(), + "session-456".to_string(), + )]; + exporter.export_batch(&events).await.unwrap(); + + // Flush and close + exporter.flush().await.unwrap(); + exporter.close().await.unwrap(); + } +} diff --git a/src/server/audit/mod.rs b/src/server/audit/mod.rs new file mode 100644 index 00000000..f2c1b5f0 --- /dev/null +++ b/src/server/audit/mod.rs @@ -0,0 +1,565 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Audit logging infrastructure for the SSH server. +//! +//! This module provides comprehensive audit logging capabilities for tracking +//! security-relevant events, file transfers, and user activities. +//! +//! # Overview +//! +//! The audit system consists of: +//! +//! - [`AuditEvent`]: Event types representing various auditable actions +//! - [`AuditExporter`]: Trait for implementing audit event destinations +//! - [`AuditManager`]: Central manager for collecting and distributing events +//! - [`NullExporter`]: No-op exporter for testing and disabled audit logging +//! +//! # Example +//! +//! ```no_run +//! use bssh::server::audit::{AuditManager, AuditConfig, event::{AuditEvent, EventType}}; +//! +//! # async fn example() -> anyhow::Result<()> { +//! let config = AuditConfig::default(); +//! let manager = AuditManager::new(&config)?; +//! +//! let event = AuditEvent::new( +//! EventType::AuthSuccess, +//! "alice".to_string(), +//! "session-123".to_string(), +//! ); +//! +//! manager.log(event).await; +//! # Ok(()) +//! # } +//! ``` + +pub mod event; +pub mod exporter; + +use anyhow::Result; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +pub use event::{AuditEvent, EventResult, EventType}; +pub use exporter::{AuditExporter, NullExporter}; + +/// Configuration for the audit system. +#[derive(Debug, Clone)] +pub struct AuditConfig { + /// Whether audit logging is enabled + pub enabled: bool, + + /// Buffer size for the event channel + pub buffer_size: usize, + + /// Maximum events to buffer before flushing + pub batch_size: usize, + + /// Interval for automatic flush of buffered events + pub flush_interval_secs: u64, + + /// Exporters to use + pub exporters: Vec, +} + +/// Configuration for an audit exporter. +#[derive(Debug, Clone)] +pub enum AuditExporterConfig { + /// Null exporter (discards events) + Null, + /// File exporter (future implementation) + #[allow(dead_code)] + File { + /// Path to the audit log file + path: String, + }, + /// OpenTelemetry exporter (future implementation) + #[allow(dead_code)] + Otel { + /// OTLP endpoint URL + endpoint: String, + }, + /// Logstash exporter (future implementation) + #[allow(dead_code)] + Logstash { + /// Logstash host + host: String, + /// Logstash port + port: u16, + }, +} + +impl Default for AuditConfig { + fn default() -> Self { + Self { + enabled: false, + buffer_size: 1000, + batch_size: 100, + flush_interval_secs: 5, + exporters: vec![AuditExporterConfig::Null], + } + } +} + +impl AuditConfig { + /// Create a new audit configuration. + pub fn new() -> Self { + Self::default() + } + + /// Enable audit logging. + pub fn with_enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } + + /// Set the buffer size. + /// + /// # Panics + /// + /// Panics if size is 0. + pub fn with_buffer_size(mut self, size: usize) -> Self { + assert!(size >= 1, "buffer_size must be at least 1"); + self.buffer_size = size; + self + } + + /// Set the batch size. + /// + /// # Panics + /// + /// Panics if size is 0. + pub fn with_batch_size(mut self, size: usize) -> Self { + assert!(size >= 1, "batch_size must be at least 1"); + self.batch_size = size; + self + } + + /// Set the flush interval. + /// + /// # Panics + /// + /// Panics if secs is 0. + pub fn with_flush_interval(mut self, secs: u64) -> Self { + assert!(secs >= 1, "flush_interval_secs must be at least 1"); + self.flush_interval_secs = secs; + self + } + + /// Set the exporters. + pub fn with_exporters(mut self, exporters: Vec) -> Self { + self.exporters = exporters; + self + } +} + +/// Manages audit logging with multiple exporters. +/// +/// The audit manager collects events from the application and distributes +/// them to configured exporters. It uses a background worker for async +/// processing and buffering to improve performance. +pub struct AuditManager { + /// Configured exporters + exporters: Vec>, + + /// Channel sender for audit events + sender: mpsc::Sender, + + /// Whether audit logging is enabled + enabled: bool, + + /// Handle to the background worker task + worker_handle: Option>, +} + +impl AuditManager { + /// Create a new audit manager with the given configuration. + /// + /// This starts a background worker task that processes events + /// asynchronously. + /// + /// # Arguments + /// + /// * `config` - Audit configuration + /// + /// # Errors + /// + /// Returns an error if any exporter fails to initialize. + pub fn new(config: &AuditConfig) -> Result { + let (sender, receiver) = mpsc::channel(config.buffer_size); + + let mut exporters: Vec> = Vec::new(); + + for exporter_config in &config.exporters { + let exporter: Arc = match exporter_config { + AuditExporterConfig::Null => Arc::new(NullExporter::new()), + AuditExporterConfig::File { .. } => { + // Future implementation + tracing::warn!("File exporter not yet implemented, using null exporter"); + Arc::new(NullExporter::new()) + } + AuditExporterConfig::Otel { .. } => { + // Future implementation + tracing::warn!("OTEL exporter not yet implemented, using null exporter"); + Arc::new(NullExporter::new()) + } + AuditExporterConfig::Logstash { .. } => { + // Future implementation + tracing::warn!("Logstash exporter not yet implemented, using null exporter"); + Arc::new(NullExporter::new()) + } + }; + exporters.push(exporter); + } + + // Start background worker + let worker_handle = if config.enabled { + let batch_size = config.batch_size; + let flush_interval = Duration::from_secs(config.flush_interval_secs); + Some(tokio::spawn(Self::worker( + receiver, + exporters.clone(), + batch_size, + flush_interval, + ))) + } else { + None + }; + + let manager = Self { + exporters, + sender, + enabled: config.enabled, + worker_handle, + }; + + Ok(manager) + } + + /// Log an audit event. + /// + /// If auditing is disabled, this is a no-op. Events are sent to the + /// background worker for processing. + /// + /// # Arguments + /// + /// * `event` - The audit event to log + pub async fn log(&self, event: AuditEvent) { + if !self.enabled { + return; + } + + if let Err(e) = self.sender.send(event).await { + tracing::warn!("Failed to send audit event: {}", e); + } + } + + /// Background worker for async event processing. + /// + /// This task receives events from the channel, buffers them, and + /// periodically flushes them to all configured exporters. + async fn worker( + mut receiver: mpsc::Receiver, + exporters: Vec>, + batch_size: usize, + flush_interval: Duration, + ) { + let mut buffer = Vec::with_capacity(batch_size); + let mut flush_timer = tokio::time::interval(flush_interval); + flush_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + + event_opt = receiver.recv() => { + match event_opt { + Some(event) => { + buffer.push(event); + + // Flush if buffer is full + if buffer.len() >= batch_size { + Self::flush_buffer(&exporters, &mut buffer).await; + } + } + None => { + // Channel closed, flush remaining events and exit + if !buffer.is_empty() { + Self::flush_buffer(&exporters, &mut buffer).await; + } + break; + } + } + } + _ = flush_timer.tick() => { + if !buffer.is_empty() { + Self::flush_buffer(&exporters, &mut buffer).await; + } + } + } + } + + // Close all exporters + for exporter in &exporters { + if let Err(e) = exporter.close().await { + tracing::error!("Failed to close exporter: {}", e); + } + } + } + + /// Flush the event buffer to all exporters. + async fn flush_buffer(exporters: &[Arc], buffer: &mut Vec) { + for exporter in exporters { + if let Err(e) = exporter.export_batch(buffer).await { + tracing::error!("Audit export failed: {}", e); + } + } + buffer.clear(); + } + + /// Flush all pending events immediately. + /// + /// This waits for all exporters to complete their flush operations. + pub async fn flush(&self) { + for exporter in &self.exporters { + if let Err(e) = exporter.flush().await { + tracing::error!("Audit flush failed: {}", e); + } + } + } + + /// Check if audit logging is enabled. + pub fn is_enabled(&self) -> bool { + self.enabled + } + + /// Gracefully shut down the audit manager. + /// + /// This method: + /// 1. Drops the sender to signal the worker to stop accepting new events + /// 2. Waits for the worker to finish processing buffered events + /// 3. Ensures all exporters are properly closed + /// + /// After calling this method, the AuditManager should not be used. + /// + /// # Errors + /// + /// Returns an error if the worker task panicked or if there was an issue + /// waiting for the worker to complete. + pub async fn shutdown(mut self) -> Result<()> { + // Drop the sender to signal the worker to exit + drop(self.sender); + + // Wait for the worker to finish + if let Some(handle) = self.worker_handle.take() { + handle + .await + .map_err(|e| anyhow::anyhow!("Worker task panicked: {}", e))?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_audit_manager_creation() { + let config = AuditConfig::default(); + let manager = AuditManager::new(&config); + assert!(manager.is_ok()); + } + + #[tokio::test] + async fn test_audit_manager_disabled() { + let config = AuditConfig::new().with_enabled(false); + let manager = AuditManager::new(&config).unwrap(); + + let event = AuditEvent::new( + EventType::AuthSuccess, + "test".to_string(), + "session-1".to_string(), + ); + + // Should not panic when disabled + manager.log(event).await; + assert!(!manager.is_enabled()); + } + + #[tokio::test] + async fn test_audit_manager_enabled() { + let config = AuditConfig::new() + .with_enabled(true) + .with_buffer_size(10) + .with_batch_size(5); + + let manager = AuditManager::new(&config).unwrap(); + assert!(manager.is_enabled()); + + let event = AuditEvent::new( + EventType::SessionStart, + "alice".to_string(), + "session-123".to_string(), + ); + + manager.log(event).await; + // Give the worker time to process + tokio::time::sleep(Duration::from_millis(100)).await; + } + + #[tokio::test] + async fn test_audit_manager_batch() { + let config = AuditConfig::new() + .with_enabled(true) + .with_batch_size(3) + .with_buffer_size(100); + + let manager = AuditManager::new(&config).unwrap(); + + // Send multiple events + for i in 0..5 { + let event = AuditEvent::new( + EventType::FileUploaded, + format!("user{}", i), + format!("session-{}", i), + ); + manager.log(event).await; + } + + // Give the worker time to process + tokio::time::sleep(Duration::from_millis(100)).await; + manager.flush().await; + } + + #[tokio::test] + async fn test_audit_config_builder() { + let config = AuditConfig::new() + .with_enabled(true) + .with_buffer_size(500) + .with_batch_size(50) + .with_flush_interval(10); + + assert!(config.enabled); + assert_eq!(config.buffer_size, 500); + assert_eq!(config.batch_size, 50); + assert_eq!(config.flush_interval_secs, 10); + } + + #[tokio::test] + async fn test_audit_manager_with_null_exporter() { + let config = AuditConfig::new() + .with_enabled(true) + .with_exporters(vec![AuditExporterConfig::Null]); + + let manager = AuditManager::new(&config).unwrap(); + + let event = AuditEvent::new( + EventType::CommandExecuted, + "bob".to_string(), + "session-456".to_string(), + ); + + manager.log(event).await; + tokio::time::sleep(Duration::from_millis(50)).await; + manager.flush().await; + } + + #[tokio::test] + async fn test_audit_manager_flush_on_interval() { + let config = AuditConfig::new() + .with_enabled(true) + .with_batch_size(100) // Large batch to avoid early flush + .with_flush_interval(1); // 1 second interval + + let manager = AuditManager::new(&config).unwrap(); + + // Send a few events + for i in 0..3 { + let event = AuditEvent::new( + EventType::DirectoryListed, + format!("user{}", i), + format!("session-{}", i), + ); + manager.log(event).await; + } + + // Wait for flush interval + tokio::time::sleep(Duration::from_millis(1100)).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_audit_manager_shutdown() { + let config = AuditConfig::new() + .with_enabled(true) + .with_buffer_size(10) + .with_batch_size(5) + .with_flush_interval(1); + + let manager = AuditManager::new(&config).unwrap(); + + // Send some events + for i in 0..3 { + let event = AuditEvent::new( + EventType::FileUploaded, + format!("user{}", i), + format!("session-{}", i), + ); + manager.log(event).await; + } + + // Give a small amount of time for events to be queued + tokio::time::sleep(Duration::from_millis(50)).await; + + // Shutdown should wait for all events to be processed + let result = tokio::time::timeout(Duration::from_secs(10), manager.shutdown()).await; + assert!(result.is_ok(), "Shutdown timed out"); + assert!(result.unwrap().is_ok(), "Shutdown failed"); + } + + #[test] + #[should_panic(expected = "buffer_size must be at least 1")] + fn test_audit_config_invalid_buffer_size() { + let _config = AuditConfig::new().with_buffer_size(0); + } + + #[test] + #[should_panic(expected = "batch_size must be at least 1")] + fn test_audit_config_invalid_batch_size() { + let _config = AuditConfig::new().with_batch_size(0); + } + + #[test] + #[should_panic(expected = "flush_interval_secs must be at least 1")] + fn test_audit_config_invalid_flush_interval() { + let _config = AuditConfig::new().with_flush_interval(0); + } + + #[test] + fn test_audit_config_valid_minimum_values() { + let config = AuditConfig::new() + .with_buffer_size(1) + .with_batch_size(1) + .with_flush_interval(1); + + assert_eq!(config.buffer_size, 1); + assert_eq!(config.batch_size, 1); + assert_eq!(config.flush_interval_secs, 1); + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 1818d341..822907ef 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -44,6 +44,7 @@ //! } //! ``` +pub mod audit; pub mod auth; pub mod config; pub mod exec;