feat: Implement Logstash audit exporter#162
Conversation
Add LogstashExporter implementation that sends audit events to Logstash via TCP using JSON Lines protocol. Key features: - TCP connection with automatic reconnection on failure - JSON Lines protocol (newline-delimited JSON) - Batch support for efficient event transmission - Connection timeout handling (10 seconds) - Comprehensive test coverage Implementation details: - Create src/server/audit/logstash.rs with LogstashExporter struct - Wire up LogstashExporter in AuditManager - Implement AuditExporter trait methods: export, export_batch, flush, close - Add 9 unit tests covering all functionality including edge cases Resolves #137
Security & Performance ReviewAnalysis Summary
CRITICAL1. Unencrypted Transmission of Sensitive Audit DataFile: The LogstashExporter uses plain TCP without TLS encryption to transmit audit events. Audit logs often contain sensitive security information including:
Impact: An attacker with network access could intercept audit data in transit (man-in-the-middle attack), compromising security monitoring and potentially exposing sensitive information. Comparison: The if !endpoint.starts_with("https://") {
tracing::warn!(
endpoint = %endpoint,
"OpenTelemetry audit exporter is not using HTTPS. ..."
);
}Recommendation:
HIGH2. Missing Host Validation - Potential SSRF VectorFile: The
pub fn new(host: &str, port: u16) -> Result<Self> {
if host.is_empty() {
anyhow::bail!("Logstash host cannot be empty");
}
// No further validation of host formatRecommendation: Add validation similar to OtelExporter's URL validation:
3. Reconnect Delay Blocks Event ExportFile: When a connection fails during // Connection lost or didn't exist, reconnect and retry
tokio::time::sleep(self.reconnect_delay).await; // Blocks with lock held
let mut stream = self.connect().await?;Impact:
Recommendation:
MEDIUM4. Unbounded Batch Buffer GrowthFile: The async fn export_batch(&self, events: &[AuditEvent]) -> Result<()> {
self.ensure_connected().await?;
let mut batch = String::new();
for event in events {
batch.push_str(&self.format_event(event)?);
}
self.send(batch.as_bytes()).await
}Impact: For large batches, this could cause memory pressure or OOM in constrained environments. Recommendation:
5. Audit Event Loss on Partial Send FailureFile: If a write partially succeeds or the connection drops mid-batch, there's no mechanism to track which events were successfully sent: async fn send(&self, data: &[u8]) -> Result<()> {
// ... if write fails, reconnect and retry from beginning
stream.write_all(data).await // Retries entire bufferImpact: During network instability, events could be:
Recommendation:
LOW6. Missing Debug Trait ImplementationFile: The impl std::fmt::Debug for OtelExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { ... }
}Impact: Makes debugging and logging the exporter state more difficult. Recommendation: Add Positive Observations
SummaryThe implementation is functionally correct and follows Rust best practices for async code. The main concerns are:
These issues should be addressed before using this exporter in production environments where audit log integrity and confidentiality are important. |
Mark LogstashExporter as implemented (no longer "planned") and add documentation with usage example.
PR Finalization ReportProject Structure Discovered
ChecklistTests
Documentation
Code Quality
Changes Made
Test SummaryAll 11 logstash tests pass:
Notes
|
Summary
Implements a Logstash exporter for audit events, enabling integration with ELK stack (Elasticsearch, Logstash, Kibana) for log analysis.
Changes
src/server/audit/logstash.rsImplementation Details
LogstashExporter
Tests
All tests passing (9/9):
Configuration Example
Testing
cargo build --lib cargo clippy --lib -- -D warnings cargo test --lib server::audit::logstash cargo fmtResolves #137