Skip to content

Implement OpenTelemetry audit exporter #136

@inureyes

Description

@inureyes

Summary

Implement an OpenTelemetry (OTLP) exporter for audit events, enabling integration with observability platforms like Jaeger, Grafana Tempo, and cloud monitoring services.

Parent Epic

Implementation Details

// src/server/audit/otel.rs
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig};
use opentelemetry_sdk::{
    logs::{LoggerProvider, Config},
    Resource,
};
use tracing_subscriber::layer::SubscriberExt;

/// OpenTelemetry audit exporter
pub struct OtelExporter {
    logger: LoggerProvider,
    endpoint: String,
}

impl OtelExporter {
    pub fn new(endpoint: &str) -> Result<Self> {
        let export_config = ExportConfig {
            endpoint: endpoint.to_string(),
            protocol: Protocol::Grpc,
            ..Default::default()
        };

        let exporter = opentelemetry_otlp::new_exporter()
            .tonic()
            .with_export_config(export_config)
            .build_log_exporter()?;

        let resource = Resource::new(vec![
            KeyValue::new("service.name", "bssh-server"),
            KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
        ]);

        let logger = LoggerProvider::builder()
            .with_config(Config::default().with_resource(resource))
            .with_simple_exporter(exporter)
            .build();

        Ok(Self {
            logger,
            endpoint: endpoint.to_string(),
        })
    }

    fn event_to_log_record(&self, event: &AuditEvent) -> LogRecord {
        let mut attributes = vec![
            KeyValue::new("event.id", event.id.clone()),
            KeyValue::new("event.type", format!("{:?}", event.event_type)),
            KeyValue::new("session.id", event.session_id.clone()),
            KeyValue::new("user.name", event.user.clone()),
            KeyValue::new("result", format!("{:?}", event.result)),
        ];

        if let Some(ref ip) = event.client_ip {
            attributes.push(KeyValue::new("client.ip", ip.to_string()));
        }
        if let Some(ref path) = event.path {
            attributes.push(KeyValue::new("file.path", path.display().to_string()));
        }
        if let Some(bytes) = event.bytes {
            attributes.push(KeyValue::new("file.bytes", bytes as i64));
        }
        if let Some(ref protocol) = event.protocol {
            attributes.push(KeyValue::new("protocol", protocol.clone()));
        }
        if let Some(ref details) = event.details {
            attributes.push(KeyValue::new("details", details.clone()));
        }

        LogRecord {
            timestamp: Some(event.timestamp.into()),
            severity_number: self.event_to_severity(&event.event_type),
            severity_text: Some(format!("{:?}", event.event_type)),
            body: Some(format!("{:?}", event.event_type).into()),
            attributes,
            ..Default::default()
        }
    }

    fn event_to_severity(&self, event_type: &EventType) -> SeverityNumber {
        match event_type {
            EventType::AuthFailure | EventType::AuthRateLimited => SeverityNumber::Warn,
            EventType::TransferDenied | EventType::CommandBlocked => SeverityNumber::Warn,
            EventType::SuspiciousActivity | EventType::IpBlocked => SeverityNumber::Error,
            _ => SeverityNumber::Info,
        }
    }
}

#[async_trait]
impl AuditExporter for OtelExporter {
    async fn export(&self, event: AuditEvent) -> Result<()> {
        let log_record = self.event_to_log_record(&event);
        self.logger.log(&log_record);
        Ok(())
    }

    async fn export_batch(&self, events: Vec<AuditEvent>) -> Result<()> {
        for event in events {
            let log_record = self.event_to_log_record(&event);
            self.logger.log(&log_record);
        }
        Ok(())
    }

    async fn flush(&self) -> Result<()> {
        self.logger.force_flush();
        Ok(())
    }

    async fn close(&self) -> Result<()> {
        self.logger.shutdown()?;
        Ok(())
    }
}

Dependencies to Add

[dependencies]
opentelemetry = "0.21"
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "logs"] }
opentelemetry-otlp = { version = "0.14", features = ["grpc-tonic", "logs"] }

Configuration

audit:
  enabled: true
  exporters:
    - type: otel
      endpoint: http://otel-collector:4317

Files to Create/Modify

File Action
src/server/audit/otel.rs Create - OTEL exporter
src/server/audit/mod.rs Modify - Add otel module
Cargo.toml Modify - Add opentelemetry deps

Testing Requirements

  1. Unit test: Event to LogRecord conversion
  2. Integration test: Export to mock OTEL collector

Acceptance Criteria

  • OtelExporter implementation
  • OTLP/gRPC protocol support
  • Event to LogRecord mapping
  • Severity level mapping
  • Resource attributes (service name, version)
  • Graceful shutdown
  • Tests passing

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions