Skip to content

Implement Logstash audit exporter #137

@inureyes

Description

@inureyes

Summary

Implement a Logstash exporter for audit events, enabling integration with ELK stack (Elasticsearch, Logstash, Kibana) for log analysis.

Parent Epic

Implementation Details

// src/server/audit/logstash.rs
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use std::time::Duration;

/// Logstash audit exporter (TCP JSON)
pub struct LogstashExporter {
    host: String,
    port: u16,
    connection: Mutex<Option<TcpStream>>,
    reconnect_delay: Duration,
}

impl LogstashExporter {
    pub fn new(host: &str, port: u16) -> Result<Self> {
        Ok(Self {
            host: host.to_string(),
            port,
            connection: Mutex::new(None),
            reconnect_delay: Duration::from_secs(5),
        })
    }

    async fn ensure_connected(&self) -> Result<()> {
        let mut conn = self.connection.lock().await;
        
        if conn.is_none() {
            match self.connect().await {
                Ok(stream) => {
                    *conn = Some(stream);
                }
                Err(e) => {
                    tracing::warn!("Failed to connect to Logstash: {}", e);
                    return Err(e);
                }
            }
        }
        
        Ok(())
    }

    async fn connect(&self) -> Result<TcpStream> {
        let addr = format!("{}:{}", self.host, self.port);
        let stream = tokio::time::timeout(
            Duration::from_secs(10),
            TcpStream::connect(&addr),
        )
        .await
        .context("Connection timeout")?
        .context("Failed to connect")?;

        tracing::info!("Connected to Logstash at {}", addr);
        Ok(stream)
    }

    async fn send(&self, data: &[u8]) -> Result<()> {
        let mut conn = self.connection.lock().await;
        
        if let Some(ref mut stream) = *conn {
            match stream.write_all(data).await {
                Ok(_) => return Ok(()),
                Err(e) => {
                    tracing::warn!("Logstash write failed, reconnecting: {}", e);
                    *conn = None;
                }
            }
        }
        
        // Reconnect and retry
        *conn = Some(self.connect().await?);
        if let Some(ref mut stream) = *conn {
            stream.write_all(data).await?;
        }
        
        Ok(())
    }

    fn format_event(&self, event: &AuditEvent) -> Result<String> {
        // Logstash expects JSON with newline delimiter
        let mut json = serde_json::to_string(event)?;
        json.push('\n');
        Ok(json)
    }
}

#[async_trait]
impl AuditExporter for LogstashExporter {
    async fn export(&self, event: AuditEvent) -> Result<()> {
        self.ensure_connected().await?;
        let data = self.format_event(&event)?;
        self.send(data.as_bytes()).await
    }

    async fn export_batch(&self, events: Vec<AuditEvent>) -> Result<()> {
        self.ensure_connected().await?;
        
        let mut batch = String::new();
        for event in events {
            batch.push_str(&self.format_event(&event)?);
        }
        
        self.send(batch.as_bytes()).await
    }

    async fn flush(&self) -> Result<()> {
        let mut conn = self.connection.lock().await;
        if let Some(ref mut stream) = *conn {
            stream.flush().await?;
        }
        Ok(())
    }

    async fn close(&self) -> Result<()> {
        let mut conn = self.connection.lock().await;
        *conn = None;
        Ok(())
    }
}

Logstash Configuration

# /etc/logstash/conf.d/bssh.conf
input {
  tcp {
    port => 5044
    codec => json_lines
  }
}

filter {
  if [event_type] {
    mutate {
      add_field => { "[@metadata][index]" => "bssh-audit" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
  }
}

Configuration

audit:
  enabled: true
  exporters:
    - type: logstash
      host: logstash.example.com
      port: 5044

Files to Create/Modify

File Action
src/server/audit/logstash.rs Create - Logstash exporter
src/server/audit/mod.rs Modify - Add logstash module

Testing Requirements

  1. Unit test: JSON formatting
  2. Integration test: Connection and reconnection
  3. Integration test: Batch sending

Acceptance Criteria

  • LogstashExporter implementation
  • TCP connection with JSON Lines protocol
  • Auto-reconnection on failure
  • Batch support
  • Connection timeout handling
  • Tests passing

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions