From afb1ca093168e9360128011ecec8aa411fb2d945 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 13:21:34 +0900 Subject: [PATCH 1/3] feat: Implement file-based audit exporter with JSON Lines format Implement a file-based audit exporter that writes events in JSON Lines format for easy parsing and analysis. This completes the audit infrastructure by providing a production-ready exporter. Features: - JSON Lines format (one JSON object per line) - Append mode to preserve existing data - Log rotation based on file size - Optional gzip compression for rotated files - Thread-safe using async Mutex - Async I/O using tokio - Comprehensive test coverage (33 tests passing) Implementation: - Add FileExporter struct with BufWriter for efficient I/O - Add RotateConfig for configurable rotation behavior - Wire up FileExporter in AuditManager - Add async-compression and serde_json dependencies Closes #135 --- Cargo.lock | 30 +++ Cargo.toml | 3 +- src/server/audit/file.rs | 551 +++++++++++++++++++++++++++++++++++++++ src/server/audit/mod.rs | 12 +- 4 files changed, 589 insertions(+), 7 deletions(-) create mode 100644 src/server/audit/file.rs diff --git a/Cargo.lock b/Cargo.lock index 95e4a96c..1f09ac22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,6 +179,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d10e4f991a553474232bc0a31799f6d24b034a84c0971d80d2e2f78b2e576e40" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -385,6 +397,7 @@ dependencies = [ "anyhow", "argon2", "arrayvec", + "async-compression", "async-trait", "atty", "bcrypt", @@ -730,6 +743,23 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "compression-codecs" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00828ba6fd27b45a448e57dbfe84f1029d4c9f26b368157e9a448a5f49a2ec2a" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "console" version = "0.15.11" diff --git a/Cargo.toml b/Cargo.toml index a0f6d34f..ae049643 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,8 @@ bcrypt = "0.16" argon2 = "0.5" rand = "0.8" ssh-key = { version = "0.6", features = ["std"] } +async-compression = { version = "0.4", features = ["tokio", "gzip"] } +serde_json = "1.0" [target.'cfg(target_os = "macos")'.dependencies] security-framework = "3.5.1" @@ -80,7 +82,6 @@ serial_test = "3.2" insta = "1.44" criterion = { version = "0.8", features = ["html_reports"] } mockall = "0.14" -serde_json = "1.0" [[bench]] name = "large_output_benchmark" diff --git a/src/server/audit/file.rs b/src/server/audit/file.rs new file mode 100644 index 00000000..92fbcc13 --- /dev/null +++ b/src/server/audit/file.rs @@ -0,0 +1,551 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! File-based audit exporter implementation. +//! +//! This module provides a file-based audit exporter that writes audit events +//! in JSON Lines format. It supports log rotation and optional gzip compression. + +use super::event::AuditEvent; +use super::exporter::AuditExporter; +use anyhow::Result; +use async_trait::async_trait; +use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::Mutex; + +/// Configuration for file rotation. +#[derive(Debug, Clone)] +pub struct RotateConfig { + /// Maximum file size in bytes before rotation + pub max_size: u64, + /// Maximum number of backup files to keep + pub max_backups: usize, + /// Compress rotated files with gzip + pub compress: bool, +} + +impl Default for RotateConfig { + fn default() -> Self { + Self { + max_size: 100 * 1024 * 1024, // 100 MB + max_backups: 5, + compress: true, + } + } +} + +impl RotateConfig { + /// Create a new rotation configuration. + pub fn new() -> Self { + Self::default() + } + + /// Set the maximum file size before rotation. + pub fn with_max_size(mut self, max_size: u64) -> Self { + self.max_size = max_size; + self + } + + /// Set the maximum number of backup files to keep. + pub fn with_max_backups(mut self, max_backups: usize) -> Self { + self.max_backups = max_backups; + self + } + + /// Enable or disable gzip compression for rotated files. + pub fn with_compress(mut self, compress: bool) -> Self { + self.compress = compress; + self + } +} + +/// File-based audit exporter that writes events in JSON Lines format. +/// +/// Each event is written as a single JSON object on its own line, making it +/// easy to parse and process with standard tools. +/// +/// # Features +/// +/// - Append mode to preserve existing data +/// - Optional log rotation based on file size +/// - Optional gzip compression for rotated files +/// - Thread-safe using async Mutex +/// - Async I/O using tokio +/// +/// # Example +/// +/// ```no_run +/// use bssh::server::audit::file::{FileExporter, RotateConfig}; +/// use std::path::Path; +/// +/// # async fn example() -> anyhow::Result<()> { +/// let exporter = FileExporter::new(Path::new("/var/log/audit.log"))?; +/// +/// // With rotation +/// let rotate_config = RotateConfig::new() +/// .with_max_size(50 * 1024 * 1024) +/// .with_max_backups(10) +/// .with_compress(true); +/// +/// let exporter = FileExporter::new(Path::new("/var/log/audit.log"))? +/// .with_rotation(rotate_config); +/// # Ok(()) +/// # } +/// ``` +pub struct FileExporter { + path: PathBuf, + writer: Mutex>, + rotate_config: Option, +} + +impl FileExporter { + /// Create a new file exporter. + /// + /// The file is opened in append mode and created if it doesn't exist. + /// + /// # Arguments + /// + /// * `path` - Path to the audit log file + /// + /// # Errors + /// + /// Returns an error if the file cannot be opened or created. + pub fn new(path: &Path) -> Result { + // Create parent directory if it doesn't exist + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + Ok(Self { + path: path.to_path_buf(), + writer: Mutex::new(BufWriter::new(File::from_std(file))), + rotate_config: None, + }) + } + + /// Enable log rotation with the given configuration. + pub fn with_rotation(mut self, config: RotateConfig) -> Self { + self.rotate_config = Some(config); + self + } + + /// Check if the file should be rotated and perform rotation if needed. + async fn check_rotation(&self) -> Result<()> { + if let Some(ref config) = self.rotate_config { + // Flush to ensure accurate size check + { + let mut writer = self.writer.lock().await; + writer.flush().await?; + } + + let metadata = tokio::fs::metadata(&self.path).await?; + if metadata.len() >= config.max_size { + self.rotate(config).await?; + } + } + Ok(()) + } + + /// Rotate the log file. + /// + /// This method: + /// 1. Flushes and closes the current writer + /// 2. Shifts existing backup files (file.log.N -> file.log.N+1) + /// 3. Renames current file to file.log.1 + /// 4. Optionally compresses the renamed file + /// 5. Deletes the oldest backup if exceeding max_backups + /// 6. Reopens the file for writing + async fn rotate(&self, config: &RotateConfig) -> Result<()> { + // Flush and close current writer + { + let mut writer = self.writer.lock().await; + writer.flush().await?; + } + + // Rotate existing backup files: file.log.N -> file.log.N+1 + for i in (1..config.max_backups).rev() { + let old_path = if config.compress { + format!("{}.{}.gz", self.path.display(), i) + } else { + format!("{}.{}", self.path.display(), i) + }; + + let new_path = if config.compress { + format!("{}.{}.gz", self.path.display(), i + 1) + } else { + format!("{}.{}", self.path.display(), i + 1) + }; + + if tokio::fs::metadata(&old_path).await.is_ok() { + tokio::fs::rename(&old_path, &new_path).await?; + } + } + + // Move current file to .1 + let backup_path = format!("{}.1", self.path.display()); + tokio::fs::rename(&self.path, &backup_path).await?; + + // Compress if configured + if config.compress { + self.compress_file(&backup_path).await?; + } + + // Delete oldest backup if it exceeds max_backups + let oldest = if config.compress { + format!("{}.{}.gz", self.path.display(), config.max_backups + 1) + } else { + format!("{}.{}", self.path.display(), config.max_backups + 1) + }; + let _ = tokio::fs::remove_file(&oldest).await; + + // Reopen file for writing + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + .await?; + + let mut writer = self.writer.lock().await; + *writer = BufWriter::new(file); + + Ok(()) + } + + /// Compress a file using gzip and delete the original. + async fn compress_file(&self, path: &str) -> Result<()> { + use async_compression::tokio::write::GzipEncoder; + + let input = tokio::fs::read(path).await?; + let compressed_path = format!("{}.gz", path); + + let file = tokio::fs::File::create(&compressed_path).await?; + let mut encoder = GzipEncoder::new(file); + encoder.write_all(&input).await?; + encoder.shutdown().await?; + + tokio::fs::remove_file(path).await?; + Ok(()) + } +} + +#[async_trait] +impl AuditExporter for FileExporter { + async fn export(&self, event: AuditEvent) -> Result<()> { + self.check_rotation().await?; + + let json = serde_json::to_string(&event)?; + + let mut writer = self.writer.lock().await; + writer.write_all(json.as_bytes()).await?; + writer.write_all(b"\n").await?; + + Ok(()) + } + + async fn export_batch(&self, events: &[AuditEvent]) -> Result<()> { + self.check_rotation().await?; + + let mut writer = self.writer.lock().await; + + for event in events { + let json = serde_json::to_string(event)?; + writer.write_all(json.as_bytes()).await?; + writer.write_all(b"\n").await?; + } + + Ok(()) + } + + async fn flush(&self) -> Result<()> { + let mut writer = self.writer.lock().await; + writer.flush().await?; + Ok(()) + } + + async fn close(&self) -> Result<()> { + self.flush().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::server::audit::event::{EventResult, EventType}; + use std::net::IpAddr; + use tempfile::TempDir; + + async fn create_test_exporter() -> (FileExporter, TempDir, PathBuf) { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + let exporter = FileExporter::new(&log_path).unwrap(); + (exporter, temp_dir, log_path) + } + + #[tokio::test] + async fn test_file_exporter_creation() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + let result = FileExporter::new(&log_path); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_file_exporter_write_event() { + let (exporter, _temp_dir, log_path) = create_test_exporter().await; + + let event = AuditEvent::new( + EventType::AuthSuccess, + "test_user".to_string(), + "session-123".to_string(), + ); + + let result = exporter.export(event).await; + assert!(result.is_ok()); + + exporter.flush().await.unwrap(); + + let content = tokio::fs::read_to_string(&log_path).await.unwrap(); + assert!(content.contains("test_user")); + assert!(content.contains("session-123")); + assert!(content.contains("auth_success")); + } + + #[tokio::test] + async fn test_file_exporter_json_lines_format() { + let (exporter, _temp_dir, log_path) = create_test_exporter().await; + + let event1 = AuditEvent::new( + EventType::FileUploaded, + "alice".to_string(), + "sess-001".to_string(), + ) + .with_bytes(1024); + + let event2 = AuditEvent::new( + EventType::AuthFailure, + "bob".to_string(), + "sess-002".to_string(), + ) + .with_result(EventResult::Failure); + + exporter.export(event1).await.unwrap(); + exporter.export(event2).await.unwrap(); + exporter.flush().await.unwrap(); + + let content = tokio::fs::read_to_string(&log_path).await.unwrap(); + let lines: Vec<&str> = content.lines().collect(); + + assert_eq!(lines.len(), 2); + + // Verify each line is valid JSON + let json1: serde_json::Value = serde_json::from_str(lines[0]).unwrap(); + let json2: serde_json::Value = serde_json::from_str(lines[1]).unwrap(); + + assert_eq!(json1["user"], "alice"); + assert_eq!(json2["user"], "bob"); + } + + #[tokio::test] + async fn test_file_exporter_batch() { + let (exporter, _temp_dir, log_path) = create_test_exporter().await; + + let events = vec![ + AuditEvent::new( + EventType::SessionStart, + "user1".to_string(), + "session-1".to_string(), + ), + AuditEvent::new( + EventType::SessionEnd, + "user2".to_string(), + "session-2".to_string(), + ), + ]; + + exporter.export_batch(&events).await.unwrap(); + exporter.flush().await.unwrap(); + + let content = tokio::fs::read_to_string(&log_path).await.unwrap(); + let lines: Vec<&str> = content.lines().collect(); + + assert_eq!(lines.len(), 2); + } + + #[tokio::test] + async fn test_file_exporter_append_mode() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + { + let exporter = FileExporter::new(&log_path).unwrap(); + let event = AuditEvent::new( + EventType::FileUploaded, + "user1".to_string(), + "session-1".to_string(), + ); + exporter.export(event).await.unwrap(); + exporter.flush().await.unwrap(); + } + + { + let exporter = FileExporter::new(&log_path).unwrap(); + let event = AuditEvent::new( + EventType::FileDownloaded, + "user2".to_string(), + "session-2".to_string(), + ); + exporter.export(event).await.unwrap(); + exporter.flush().await.unwrap(); + } + + let content = tokio::fs::read_to_string(&log_path).await.unwrap(); + let lines: Vec<&str> = content.lines().collect(); + + assert_eq!(lines.len(), 2); + } + + #[tokio::test] + async fn test_file_exporter_rotation() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + let rotate_config = RotateConfig::new() + .with_max_size(100) // Small size to trigger rotation + .with_max_backups(3) + .with_compress(false); + + let exporter = FileExporter::new(&log_path) + .unwrap() + .with_rotation(rotate_config); + + // Write enough events to trigger rotation + for i in 0..10 { + let event = AuditEvent::new( + EventType::FileUploaded, + format!("user{}", i), + format!("session-{}", i), + ) + .with_client_ip("192.168.1.100".parse::().unwrap()) + .with_bytes(1024); + + exporter.export(event).await.unwrap(); + } + + exporter.flush().await.unwrap(); + + // Check that rotation happened + let backup_path = format!("{}.1", log_path.display()); + assert!(tokio::fs::metadata(&backup_path).await.is_ok()); + } + + #[tokio::test] + async fn test_file_exporter_rotation_with_compression() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + let rotate_config = RotateConfig::new() + .with_max_size(100) + .with_max_backups(3) + .with_compress(true); + + let exporter = FileExporter::new(&log_path) + .unwrap() + .with_rotation(rotate_config); + + // Write enough events to trigger rotation + for i in 0..10 { + let event = AuditEvent::new( + EventType::FileUploaded, + format!("user{}", i), + format!("session-{}", i), + ) + .with_bytes(1024); + + exporter.export(event).await.unwrap(); + } + + exporter.flush().await.unwrap(); + + // Check that compressed backup exists + let backup_path = format!("{}.1.gz", log_path.display()); + assert!(tokio::fs::metadata(&backup_path).await.is_ok()); + } + + #[tokio::test] + async fn test_file_exporter_max_backups() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + let rotate_config = RotateConfig::new() + .with_max_size(50) + .with_max_backups(2) + .with_compress(false); + + let exporter = FileExporter::new(&log_path) + .unwrap() + .with_rotation(rotate_config); + + // Write enough to trigger multiple rotations + for i in 0..30 { + let event = AuditEvent::new( + EventType::FileUploaded, + format!("user{}", i), + format!("session-{}", i), + ) + .with_bytes(1024); + + exporter.export(event).await.unwrap(); + } + + exporter.flush().await.unwrap(); + + // Should have backups .1 and .2 only + let backup1 = format!("{}.1", log_path.display()); + let backup2 = format!("{}.2", log_path.display()); + let backup3 = format!("{}.3", log_path.display()); + + assert!(tokio::fs::metadata(&backup1).await.is_ok()); + assert!(tokio::fs::metadata(&backup2).await.is_ok()); + assert!(tokio::fs::metadata(&backup3).await.is_err()); + } + + #[tokio::test] + async fn test_rotate_config_builder() { + let config = RotateConfig::new() + .with_max_size(50 * 1024 * 1024) + .with_max_backups(10) + .with_compress(true); + + assert_eq!(config.max_size, 50 * 1024 * 1024); + assert_eq!(config.max_backups, 10); + assert!(config.compress); + } + + #[tokio::test] + async fn test_file_exporter_creates_parent_dir() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("subdir").join("audit.log"); + + let result = FileExporter::new(&log_path); + assert!(result.is_ok()); + assert!(log_path.parent().unwrap().exists()); + } +} diff --git a/src/server/audit/mod.rs b/src/server/audit/mod.rs index f2c1b5f0..46369bff 100644 --- a/src/server/audit/mod.rs +++ b/src/server/audit/mod.rs @@ -48,6 +48,7 @@ pub mod event; pub mod exporter; +pub mod file; use anyhow::Result; use std::sync::Arc; @@ -57,6 +58,7 @@ use tokio::task::JoinHandle; pub use event::{AuditEvent, EventResult, EventType}; pub use exporter::{AuditExporter, NullExporter}; +pub use file::{FileExporter, RotateConfig}; /// Configuration for the audit system. #[derive(Debug, Clone)] @@ -82,8 +84,7 @@ pub struct AuditConfig { pub enum AuditExporterConfig { /// Null exporter (discards events) Null, - /// File exporter (future implementation) - #[allow(dead_code)] + /// File exporter File { /// Path to the audit log file path: String, @@ -208,10 +209,9 @@ impl AuditManager { for exporter_config in &config.exporters { let exporter: Arc = match exporter_config { AuditExporterConfig::Null => Arc::new(NullExporter::new()), - AuditExporterConfig::File { .. } => { - // Future implementation - tracing::warn!("File exporter not yet implemented, using null exporter"); - Arc::new(NullExporter::new()) + AuditExporterConfig::File { path } => { + let file_exporter = FileExporter::new(std::path::Path::new(path))?; + Arc::new(file_exporter) } AuditExporterConfig::Otel { .. } => { // Future implementation From 91f1e3fac30c07579f045d01b08f58854aa70126 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 13:27:43 +0900 Subject: [PATCH 2/3] fix: Address security issues in file audit exporter --- src/server/audit/file.rs | 224 +++++++++++++++++++++++++++++++++++---- 1 file changed, 202 insertions(+), 22 deletions(-) diff --git a/src/server/audit/file.rs b/src/server/audit/file.rs index 92fbcc13..5b5c9945 100644 --- a/src/server/audit/file.rs +++ b/src/server/audit/file.rs @@ -26,6 +26,9 @@ use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Mutex; +#[cfg(unix)] +use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt}; + /// Configuration for file rotation. #[derive(Debug, Clone)] pub struct RotateConfig { @@ -47,6 +50,25 @@ impl Default for RotateConfig { } } +impl RotateConfig { + /// Validate the configuration values. + /// + /// # Errors + /// + /// Returns an error if: + /// - max_size is 0 + /// - max_backups is 0 + pub fn validate(&self) -> Result<()> { + if self.max_size == 0 { + anyhow::bail!("max_size must be greater than 0"); + } + if self.max_backups == 0 { + anyhow::bail!("max_backups must be greater than 0"); + } + Ok(()) + } +} + impl RotateConfig { /// Create a new rotation configuration. pub fn new() -> Self { @@ -54,18 +76,21 @@ impl RotateConfig { } /// Set the maximum file size before rotation. + #[must_use] pub fn with_max_size(mut self, max_size: u64) -> Self { self.max_size = max_size; self } /// Set the maximum number of backup files to keep. + #[must_use] pub fn with_max_backups(mut self, max_backups: usize) -> Self { self.max_backups = max_backups; self } /// Enable or disable gzip compression for rotated files. + #[must_use] pub fn with_compress(mut self, compress: bool) -> Self { self.compress = compress; self @@ -115,6 +140,8 @@ impl FileExporter { /// Create a new file exporter. /// /// The file is opened in append mode and created if it doesn't exist. + /// On Unix systems, files are created with mode 0o600 (owner read/write only) + /// and directories with mode 0o700 (owner read/write/execute only). /// /// # Arguments /// @@ -124,11 +151,30 @@ impl FileExporter { /// /// Returns an error if the file cannot be opened or created. pub fn new(path: &Path) -> Result { - // Create parent directory if it doesn't exist + // Create parent directory if it doesn't exist with restrictive permissions if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent)?; + #[cfg(unix)] + { + std::fs::DirBuilder::new() + .recursive(true) + .mode(0o700) + .create(parent)?; + } + #[cfg(not(unix))] + { + std::fs::create_dir_all(parent)?; + } } + // Create file with restrictive permissions (0o600 on Unix) + #[cfg(unix)] + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .mode(0o600) + .open(path)?; + + #[cfg(not(unix))] let file = std::fs::OpenOptions::new() .create(true) .append(true) @@ -142,43 +188,56 @@ impl FileExporter { } /// Enable log rotation with the given configuration. + /// + /// # Panics + /// + /// Panics if the configuration is invalid (e.g., max_size or max_backups is 0). pub fn with_rotation(mut self, config: RotateConfig) -> Self { + config.validate().expect("invalid rotation configuration"); self.rotate_config = Some(config); self } /// Check if the file should be rotated and perform rotation if needed. + /// + /// This method holds the mutex lock during the entire size check and rotation + /// to prevent TOCTOU (time-of-check to time-of-use) race conditions. async fn check_rotation(&self) -> Result<()> { if let Some(ref config) = self.rotate_config { - // Flush to ensure accurate size check - { - let mut writer = self.writer.lock().await; - writer.flush().await?; - } + // Hold mutex lock during entire size check and rotation to prevent TOCTOU race + let mut writer = self.writer.lock().await; + writer.flush().await?; let metadata = tokio::fs::metadata(&self.path).await?; if metadata.len() >= config.max_size { - self.rotate(config).await?; + // Perform rotation while holding the lock + self.rotate_with_lock(config, &mut writer).await?; } } Ok(()) } - /// Rotate the log file. + /// Rotate the log file while holding the writer lock. /// /// This method: - /// 1. Flushes and closes the current writer + /// 1. Flushes the current writer (caller should have already flushed) /// 2. Shifts existing backup files (file.log.N -> file.log.N+1) /// 3. Renames current file to file.log.1 /// 4. Optionally compresses the renamed file /// 5. Deletes the oldest backup if exceeding max_backups /// 6. Reopens the file for writing - async fn rotate(&self, config: &RotateConfig) -> Result<()> { - // Flush and close current writer - { - let mut writer = self.writer.lock().await; - writer.flush().await?; - } + /// + /// # Arguments + /// + /// * `config` - Rotation configuration + /// * `writer` - Mutable reference to the locked writer (ensures we hold the lock) + async fn rotate_with_lock( + &self, + config: &RotateConfig, + writer: &mut BufWriter, + ) -> Result<()> { + // Flush current writer + writer.flush().await?; // Rotate existing backup files: file.log.N -> file.log.N+1 for i in (1..config.max_backups).rev() { @@ -216,29 +275,69 @@ impl FileExporter { }; let _ = tokio::fs::remove_file(&oldest).await; - // Reopen file for writing + // Reopen file for writing with restrictive permissions + #[cfg(unix)] + let file = OpenOptions::new() + .create(true) + .append(true) + .mode(0o600) + .open(&self.path) + .await?; + + #[cfg(not(unix))] let file = OpenOptions::new() .create(true) .append(true) .open(&self.path) .await?; - let mut writer = self.writer.lock().await; *writer = BufWriter::new(file); Ok(()) } /// Compress a file using gzip and delete the original. + /// + /// Uses streaming compression to avoid loading the entire file into memory. async fn compress_file(&self, path: &str) -> Result<()> { use async_compression::tokio::write::GzipEncoder; + use tokio::io::AsyncReadExt; - let input = tokio::fs::read(path).await?; let compressed_path = format!("{}.gz", path); - let file = tokio::fs::File::create(&compressed_path).await?; - let mut encoder = GzipEncoder::new(file); - encoder.write_all(&input).await?; + // Open input file for streaming + let mut input_file = tokio::fs::File::open(path).await?; + + // Create output file with restrictive permissions + #[cfg(unix)] + let output_file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .mode(0o600) + .open(&compressed_path) + .await?; + + #[cfg(not(unix))] + let output_file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&compressed_path) + .await?; + + let mut encoder = GzipEncoder::new(output_file); + + // Stream data through the encoder in chunks + let mut buffer = vec![0u8; 8192]; // 8KB buffer + loop { + let n = input_file.read(&mut buffer).await?; + if n == 0 { + break; + } + encoder.write_all(&buffer[..n]).await?; + } + encoder.shutdown().await?; tokio::fs::remove_file(path).await?; @@ -548,4 +647,85 @@ mod tests { assert!(result.is_ok()); assert!(log_path.parent().unwrap().exists()); } + + #[cfg(unix)] + #[tokio::test] + async fn test_file_permissions() { + use std::os::unix::fs::PermissionsExt; + + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + let _exporter = FileExporter::new(&log_path).unwrap(); + + let metadata = std::fs::metadata(&log_path).unwrap(); + let mode = metadata.permissions().mode(); + // Check that only owner has read/write permissions (0o600) + assert_eq!(mode & 0o777, 0o600); + } + + #[cfg(unix)] + #[tokio::test] + async fn test_directory_permissions() { + use std::os::unix::fs::PermissionsExt; + + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("subdir").join("audit.log"); + + let _exporter = FileExporter::new(&log_path).unwrap(); + + let dir_metadata = std::fs::metadata(log_path.parent().unwrap()).unwrap(); + let mode = dir_metadata.permissions().mode(); + // Check that only owner has read/write/execute permissions (0o700) + assert_eq!(mode & 0o777, 0o700); + } + + #[test] + fn test_rotate_config_validation() { + // Valid config + let config = RotateConfig::new(); + assert!(config.validate().is_ok()); + + // Invalid max_size + let config = RotateConfig { + max_size: 0, + max_backups: 5, + compress: true, + }; + assert!(config.validate().is_err()); + + // Invalid max_backups + let config = RotateConfig { + max_size: 1024, + max_backups: 0, + compress: true, + }; + assert!(config.validate().is_err()); + } + + #[test] + #[should_panic(expected = "invalid rotation configuration")] + fn test_with_rotation_panics_on_invalid_config() { + let temp_dir = TempDir::new().unwrap(); + let log_path = temp_dir.path().join("audit.log"); + + let invalid_config = RotateConfig { + max_size: 0, + max_backups: 5, + compress: true, + }; + + let _exporter = FileExporter::new(&log_path) + .unwrap() + .with_rotation(invalid_config); + } + + #[test] + fn test_rotate_config_must_use_attributes() { + let _config = RotateConfig::new() + .with_max_size(1024) + .with_max_backups(3) + .with_compress(false); + // If #[must_use] is not present, compiler won't warn about unused builder methods + } } From 90850da360cf3d4d6a53416025a982779dba53ef Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 13:29:47 +0900 Subject: [PATCH 3/3] docs: Update ARCHITECTURE.md for file-based audit exporter - Add file.rs to audit module structure listing - Document FileExporter features and capabilities - Add usage examples with rotation configuration - Document JSON Lines output format - Update future exporters list (file exporter is now implemented) --- ARCHITECTURE.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index abdbb9f5..8dda5e40 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -221,6 +221,7 @@ Comprehensive audit logging infrastructure for the SSH server (`src/server/audit - `mod.rs` - `AuditManager` for collecting and distributing audit events - `event.rs` - `AuditEvent` type definitions and builder pattern - `exporter.rs` - `AuditExporter` trait and `NullExporter` implementation +- `file.rs` - `FileExporter` for JSON Lines output with rotation support **Key Components**: @@ -250,6 +251,15 @@ Comprehensive audit logging infrastructure for the SSH server (`src/server/audit - **NullExporter**: No-op exporter for testing and disabled audit logging +- **FileExporter**: File-based exporter writing events in JSON Lines format + - Append mode to preserve existing data + - Optional log rotation based on file size (`RotateConfig`) + - Optional gzip compression for rotated files + - Thread-safe using async Mutex + - Async I/O using tokio + - Automatic parent directory creation + - Restrictive file permissions (0o600 on Unix) + - **AuditManager**: Central manager with async processing - Background worker for non-blocking event processing - Configurable buffering (buffer size, batch size) @@ -266,8 +276,30 @@ let config = AuditConfig::new() .with_flush_interval(5); ``` +**File Exporter Usage**: +```rust +use bssh::server::audit::file::{FileExporter, RotateConfig}; +use std::path::Path; + +// Simple file exporter +let exporter = FileExporter::new(Path::new("/var/log/audit.log"))?; + +// With rotation (50 MB, 10 backups, gzip compression) +let rotate_config = RotateConfig::new() + .with_max_size(50 * 1024 * 1024) + .with_max_backups(10) + .with_compress(true); + +let exporter = FileExporter::new(Path::new("/var/log/audit.log"))? + .with_rotation(rotate_config); +``` + +**Output Format** (JSON Lines - one JSON object per line): +```json +{"id":"uuid","timestamp":"2024-01-15T10:30:00Z","event_type":"file_uploaded","session_id":"sess-001","user":"admin","client_ip":"192.168.1.100","path":"/data/report.pdf","bytes":1048576,"result":"success","protocol":"sftp"} +``` + **Future Exporters** (planned): -- File exporter for local audit logs - OpenTelemetry exporter for distributed tracing - Logstash exporter for centralized logging