Skip to content

Implement file-based audit exporter (JSON Lines) #135

@inureyes

Description

@inureyes

Summary

Implement a file-based audit exporter that writes events in JSON Lines format for easy parsing and analysis.

Parent Epic

Implementation Details

// src/server/audit/file.rs
use std::path::PathBuf;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Mutex;

/// File-based audit exporter (JSON Lines format)
pub struct FileExporter {
    path: PathBuf,
    writer: Mutex<BufWriter<File>>,
    rotate_config: Option<RotateConfig>,
}

#[derive(Debug, Clone)]
pub struct RotateConfig {
    /// Maximum file size in bytes before rotation
    pub max_size: u64,
    /// Maximum number of backup files to keep
    pub max_backups: usize,
    /// Compress rotated files
    pub compress: bool,
}

impl FileExporter {
    pub fn new(path: &Path) -> Result<Self> {
        let file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;

        Ok(Self {
            path: path.to_path_buf(),
            writer: Mutex::new(BufWriter::new(File::from_std(file))),
            rotate_config: None,
        })
    }

    pub fn with_rotation(mut self, config: RotateConfig) -> Self {
        self.rotate_config = Some(config);
        self
    }

    async fn check_rotation(&self) -> Result<()> {
        if let Some(ref config) = self.rotate_config {
            let metadata = tokio::fs::metadata(&self.path).await?;
            if metadata.len() >= config.max_size {
                self.rotate(config).await?;
            }
        }
        Ok(())
    }

    async fn rotate(&self, config: &RotateConfig) -> Result<()> {
        // Close current writer
        {
            let mut writer = self.writer.lock().await;
            writer.flush().await?;
        }

        // Rotate files: file.log.N -> file.log.N+1
        for i in (1..config.max_backups).rev() {
            let old_path = format!("{}.{}", self.path.display(), i);
            let new_path = format!("{}.{}", self.path.display(), i + 1);
            if tokio::fs::metadata(&old_path).await.is_ok() {
                tokio::fs::rename(&old_path, &new_path).await?;
            }
        }

        // Current file -> file.log.1
        let backup_path = format!("{}.1", self.path.display());
        tokio::fs::rename(&self.path, &backup_path).await?;

        // Compress if configured
        if config.compress {
            self.compress_file(&backup_path).await?;
        }

        // Delete oldest if exceeds max_backups
        let oldest = format!("{}.{}", self.path.display(), config.max_backups);
        let _ = tokio::fs::remove_file(&oldest).await;

        // Reopen file
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)
            .await?;

        let mut writer = self.writer.lock().await;
        *writer = BufWriter::new(file);

        Ok(())
    }

    async fn compress_file(&self, path: &str) -> Result<()> {
        // Use gzip compression
        use async_compression::tokio::write::GzipEncoder;
        
        let input = tokio::fs::read(path).await?;
        let compressed_path = format!("{}.gz", path);
        
        let file = tokio::fs::File::create(&compressed_path).await?;
        let mut encoder = GzipEncoder::new(file);
        encoder.write_all(&input).await?;
        encoder.shutdown().await?;
        
        tokio::fs::remove_file(path).await?;
        Ok(())
    }
}

#[async_trait]
impl AuditExporter for FileExporter {
    async fn export(&self, event: AuditEvent) -> Result<()> {
        self.check_rotation().await?;

        let json = serde_json::to_string(&event)?;
        
        let mut writer = self.writer.lock().await;
        writer.write_all(json.as_bytes()).await?;
        writer.write_all(b"\n").await?;
        
        Ok(())
    }

    async fn export_batch(&self, events: Vec<AuditEvent>) -> Result<()> {
        self.check_rotation().await?;

        let mut writer = self.writer.lock().await;
        
        for event in events {
            let json = serde_json::to_string(&event)?;
            writer.write_all(json.as_bytes()).await?;
            writer.write_all(b"\n").await?;
        }
        
        Ok(())
    }

    async fn flush(&self) -> Result<()> {
        let mut writer = self.writer.lock().await;
        writer.flush().await?;
        Ok(())
    }

    async fn close(&self) -> Result<()> {
        self.flush().await
    }
}

Output Format

JSON Lines format (one JSON object per line):

{"id":"123e4567-e89b-12d3-a456-426614174000","timestamp":"2024-01-15T10:30:00Z","event_type":"file_uploaded","session_id":"sess-001","user":"admin","client_ip":"192.168.1.100","path":"/data/report.pdf","bytes":1048576,"result":"success","protocol":"sftp"}
{"id":"123e4567-e89b-12d3-a456-426614174001","timestamp":"2024-01-15T10:30:05Z","event_type":"auth_failure","session_id":"sess-002","user":"unknown","client_ip":"10.0.0.50","result":"failure","details":"Invalid password"}

Dependencies to Add

[dependencies]
async-compression = { version = "0.4", features = ["tokio", "gzip"] }

Files to Create/Modify

File Action
src/server/audit/file.rs Create - File exporter
src/server/audit/mod.rs Modify - Add file module
Cargo.toml Modify - Add async-compression

Testing Requirements

  1. Unit test: Write events to file
  2. Unit test: JSON Lines format
  3. Unit test: File rotation
  4. Unit test: Compression

Acceptance Criteria

  • FileExporter writes JSON Lines format
  • Append mode (preserves existing data)
  • Log rotation support
  • Optional gzip compression
  • Async I/O
  • Thread-safe (Mutex)
  • Tests passing

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions