From ab3498d546bdd956aff8a5c4e469c005ea48a562 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 14:18:57 +0900 Subject: [PATCH 1/3] feat: Implement Logstash audit exporter Add LogstashExporter implementation that sends audit events to Logstash via TCP using JSON Lines protocol. Key features: - TCP connection with automatic reconnection on failure - JSON Lines protocol (newline-delimited JSON) - Batch support for efficient event transmission - Connection timeout handling (10 seconds) - Comprehensive test coverage Implementation details: - Create src/server/audit/logstash.rs with LogstashExporter struct - Wire up LogstashExporter in AuditManager - Implement AuditExporter trait methods: export, export_batch, flush, close - Add 9 unit tests covering all functionality including edge cases Resolves #137 --- src/server/audit/logstash.rs | 456 +++++++++++++++++++++++++++++++++++ src/server/audit/mod.rs | 12 +- 2 files changed, 462 insertions(+), 6 deletions(-) create mode 100644 src/server/audit/logstash.rs diff --git a/src/server/audit/logstash.rs b/src/server/audit/logstash.rs new file mode 100644 index 00000000..0284e2ca --- /dev/null +++ b/src/server/audit/logstash.rs @@ -0,0 +1,456 @@ +// Copyright 2025 Lablup Inc. and Jeongkyu Shin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Logstash audit exporter for sending events to Logstash via TCP. +//! +//! This module provides a Logstash exporter that sends audit events to a +//! Logstash server using TCP with JSON Lines protocol. Each event is +//! serialized as JSON and sent with a newline delimiter. +//! +//! # Features +//! +//! - TCP connection with automatic reconnection on failure +//! - JSON Lines protocol (newline-delimited JSON) +//! - Batch support for efficient event transmission +//! - Connection timeout handling +//! +//! # Example +//! +//! ```no_run +//! use bssh::server::audit::logstash::LogstashExporter; +//! use bssh::server::audit::exporter::AuditExporter; +//! use bssh::server::audit::event::{AuditEvent, EventType}; +//! +//! # async fn example() -> anyhow::Result<()> { +//! let exporter = LogstashExporter::new("logstash.example.com", 5044)?; +//! +//! let event = AuditEvent::new( +//! EventType::AuthSuccess, +//! "alice".to_string(), +//! "session-123".to_string(), +//! ); +//! +//! exporter.export(event).await?; +//! # Ok(()) +//! # } +//! ``` + +use super::event::AuditEvent; +use super::exporter::AuditExporter; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use tokio::sync::Mutex; + +/// Logstash audit exporter. +/// +/// Sends audit events to a Logstash server via TCP using JSON Lines protocol. +/// The exporter automatically handles connection failures and reconnects +/// as needed. +pub struct LogstashExporter { + /// Logstash server hostname or IP address + host: String, + + /// Logstash server port + port: u16, + + /// TCP connection (wrapped in Mutex for interior mutability) + connection: Mutex>, + + /// Delay before attempting to reconnect after failure + reconnect_delay: Duration, + + /// Connection timeout + connect_timeout: Duration, +} + +impl LogstashExporter { + /// Create a new Logstash exporter. + /// + /// # Arguments + /// + /// * `host` - Logstash server hostname or IP address + /// * `port` - Logstash server port + /// + /// # Errors + /// + /// Returns an error if the host or port are invalid. + pub fn new(host: &str, port: u16) -> Result { + if host.is_empty() { + anyhow::bail!("Logstash host cannot be empty"); + } + + Ok(Self { + host: host.to_string(), + port, + connection: Mutex::new(None), + reconnect_delay: Duration::from_secs(5), + connect_timeout: Duration::from_secs(10), + }) + } + + /// Ensure a connection to the Logstash server exists. + /// + /// If no connection exists, attempts to establish one. + /// + /// # Errors + /// + /// Returns an error if the connection cannot be established. + async fn ensure_connected(&self) -> Result<()> { + let mut conn = self.connection.lock().await; + + if conn.is_none() { + match self.connect().await { + Ok(stream) => { + *conn = Some(stream); + } + Err(e) => { + tracing::warn!("Failed to connect to Logstash: {}", e); + return Err(e); + } + } + } + + Ok(()) + } + + /// Establish a TCP connection to the Logstash server. + /// + /// # Errors + /// + /// Returns an error if the connection times out or fails. + async fn connect(&self) -> Result { + let addr = format!("{}:{}", self.host, self.port); + + let stream = tokio::time::timeout(self.connect_timeout, TcpStream::connect(&addr)) + .await + .context("Connection timeout")? + .context("Failed to connect")?; + + tracing::info!("Connected to Logstash at {}", addr); + Ok(stream) + } + + /// Send data to the Logstash server. + /// + /// If the connection is lost, attempts to reconnect once before failing. + /// + /// # Arguments + /// + /// * `data` - Byte data to send + /// + /// # Errors + /// + /// Returns an error if the send fails or reconnection fails. + async fn send(&self, data: &[u8]) -> Result<()> { + let mut conn = self.connection.lock().await; + + // Try to send with existing connection + if let Some(ref mut stream) = *conn { + match stream.write_all(data).await { + Ok(_) => return Ok(()), + Err(e) => { + tracing::warn!("Logstash write failed, reconnecting: {}", e); + *conn = None; + } + } + } + + // Connection lost or didn't exist, reconnect and retry + tokio::time::sleep(self.reconnect_delay).await; + let mut stream = self.connect().await?; + stream + .write_all(data) + .await + .context("Failed to write after reconnection")?; + *conn = Some(stream); + + Ok(()) + } + + /// Format an audit event as JSON with newline delimiter. + /// + /// # Arguments + /// + /// * `event` - The audit event to format + /// + /// # Errors + /// + /// Returns an error if the event cannot be serialized to JSON. + fn format_event(&self, event: &AuditEvent) -> Result { + let mut json = serde_json::to_string(event).context("Failed to serialize event")?; + json.push('\n'); + Ok(json) + } +} + +#[async_trait] +impl AuditExporter for LogstashExporter { + async fn export(&self, event: AuditEvent) -> Result<()> { + self.ensure_connected().await?; + let data = self.format_event(&event)?; + self.send(data.as_bytes()).await + } + + async fn export_batch(&self, events: &[AuditEvent]) -> Result<()> { + self.ensure_connected().await?; + + // Format all events into a single buffer + let mut batch = String::new(); + for event in events { + batch.push_str(&self.format_event(event)?); + } + + self.send(batch.as_bytes()).await + } + + async fn flush(&self) -> Result<()> { + let mut conn = self.connection.lock().await; + if let Some(ref mut stream) = *conn { + stream + .flush() + .await + .context("Failed to flush Logstash connection")?; + } + Ok(()) + } + + async fn close(&self) -> Result<()> { + let mut conn = self.connection.lock().await; + if let Some(stream) = conn.take() { + drop(stream); + tracing::info!("Closed Logstash connection"); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::server::audit::event::{EventResult, EventType}; + use std::net::{IpAddr, SocketAddr}; + use tokio::io::AsyncReadExt; + use tokio::net::TcpListener; + + /// Helper to create a mock Logstash server for testing + async fn mock_logstash_server() -> (SocketAddr, tokio::task::JoinHandle>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn(async move { + let mut received_lines = Vec::new(); + let (mut socket, _) = listener.accept().await.unwrap(); + + let mut buffer = String::new(); + loop { + let mut chunk = [0u8; 1024]; + match socket.read(&mut chunk).await { + Ok(0) => break, + Ok(n) => { + buffer.push_str(&String::from_utf8_lossy(&chunk[..n])); + // Process complete lines + while let Some(pos) = buffer.find('\n') { + let line = buffer[..pos].to_string(); + buffer.drain(..=pos); + received_lines.push(line); + } + } + Err(_) => break, + } + } + received_lines + }); + + (addr, handle) + } + + #[tokio::test] + async fn test_logstash_exporter_creation() { + let exporter = LogstashExporter::new("localhost", 5044); + assert!(exporter.is_ok()); + } + + #[tokio::test] + async fn test_logstash_exporter_invalid_host() { + let exporter = LogstashExporter::new("", 5044); + assert!(exporter.is_err()); + } + + #[tokio::test] + async fn test_format_event() { + let exporter = LogstashExporter::new("localhost", 5044).unwrap(); + + let event = AuditEvent::new( + EventType::AuthSuccess, + "alice".to_string(), + "session-123".to_string(), + ); + + let formatted = exporter.format_event(&event).unwrap(); + + // Should be valid JSON ending with newline + assert!(formatted.ends_with('\n')); + let json_part = formatted.trim_end(); + assert!(serde_json::from_str::(json_part).is_ok()); + } + + #[tokio::test] + async fn test_export_single_event() { + let (addr, server_handle) = mock_logstash_server().await; + + let exporter = LogstashExporter::new(&addr.ip().to_string(), addr.port()).unwrap(); + + let event = AuditEvent::new( + EventType::SessionStart, + "bob".to_string(), + "session-456".to_string(), + ); + + let result = exporter.export(event).await; + assert!(result.is_ok()); + + // Close connection to trigger server to finish + exporter.close().await.unwrap(); + + let received = server_handle.await.unwrap(); + assert_eq!(received.len(), 1); + assert!(received[0].contains("session-456")); + assert!(received[0].contains("bob")); + } + + #[tokio::test] + async fn test_export_batch() { + let (addr, server_handle) = mock_logstash_server().await; + + let exporter = LogstashExporter::new(&addr.ip().to_string(), addr.port()).unwrap(); + + let events = vec![ + AuditEvent::new( + EventType::AuthSuccess, + "user1".to_string(), + "session-1".to_string(), + ), + AuditEvent::new( + EventType::FileUploaded, + "user2".to_string(), + "session-2".to_string(), + ) + .with_result(EventResult::Success), + AuditEvent::new( + EventType::SessionEnd, + "user3".to_string(), + "session-3".to_string(), + ), + ]; + + let result = exporter.export_batch(&events).await; + assert!(result.is_ok()); + + exporter.close().await.unwrap(); + + let received = server_handle.await.unwrap(); + assert_eq!(received.len(), 3); + assert!(received[0].contains("session-1")); + assert!(received[1].contains("session-2")); + assert!(received[2].contains("session-3")); + } + + #[tokio::test] + async fn test_connection_timeout() { + // Use a non-routable IP to trigger timeout + let exporter = LogstashExporter::new("192.0.2.1", 5044).unwrap(); + + let event = AuditEvent::new( + EventType::AuthSuccess, + "test".to_string(), + "session-test".to_string(), + ); + + let result = exporter.export(event).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_flush() { + let (addr, server_handle) = mock_logstash_server().await; + + let exporter = LogstashExporter::new(&addr.ip().to_string(), addr.port()).unwrap(); + + let event = AuditEvent::new( + EventType::CommandExecuted, + "charlie".to_string(), + "session-789".to_string(), + ); + + exporter.export(event).await.unwrap(); + let result = exporter.flush().await; + assert!(result.is_ok()); + + exporter.close().await.unwrap(); + server_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_close() { + let (addr, _server_handle) = mock_logstash_server().await; + + let exporter = LogstashExporter::new(&addr.ip().to_string(), addr.port()).unwrap(); + + // Connect by sending an event + let event = AuditEvent::new( + EventType::SessionStart, + "dave".to_string(), + "session-101".to_string(), + ); + exporter.export(event).await.unwrap(); + + // Close should succeed + let result = exporter.close().await; + assert!(result.is_ok()); + + // Close again should also succeed (idempotent) + let result = exporter.close().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_json_lines_format() { + let exporter = LogstashExporter::new("localhost", 5044).unwrap(); + + let ip: IpAddr = "192.168.1.100".parse().unwrap(); + let event = AuditEvent::new( + EventType::FileDownloaded, + "eve".to_string(), + "session-202".to_string(), + ) + .with_client_ip(ip) + .with_bytes(2048); + + let formatted = exporter.format_event(&event).unwrap(); + + // Verify JSON Lines format + assert!(formatted.ends_with('\n')); + let lines: Vec<&str> = formatted.lines().collect(); + assert_eq!(lines.len(), 1); + + // Parse and verify content + let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap(); + assert_eq!(parsed["user"], "eve"); + assert_eq!(parsed["session_id"], "session-202"); + assert_eq!(parsed["bytes"], 2048); + } +} diff --git a/src/server/audit/mod.rs b/src/server/audit/mod.rs index 2a0d29f4..f6edd228 100644 --- a/src/server/audit/mod.rs +++ b/src/server/audit/mod.rs @@ -49,6 +49,7 @@ pub mod event; pub mod exporter; pub mod file; +pub mod logstash; pub mod otel; use anyhow::Result; @@ -60,6 +61,7 @@ use tokio::task::JoinHandle; pub use event::{AuditEvent, EventResult, EventType}; pub use exporter::{AuditExporter, NullExporter}; pub use file::{FileExporter, RotateConfig}; +pub use logstash::LogstashExporter; pub use otel::OtelExporter; /// Configuration for the audit system. @@ -96,8 +98,7 @@ pub enum AuditExporterConfig { /// OTLP endpoint URL endpoint: String, }, - /// Logstash exporter (future implementation) - #[allow(dead_code)] + /// Logstash exporter Logstash { /// Logstash host host: String, @@ -218,10 +219,9 @@ impl AuditManager { let otel_exporter = OtelExporter::new(endpoint)?; Arc::new(otel_exporter) } - AuditExporterConfig::Logstash { .. } => { - // Future implementation - tracing::warn!("Logstash exporter not yet implemented, using null exporter"); - Arc::new(NullExporter::new()) + AuditExporterConfig::Logstash { host, port } => { + let logstash_exporter = LogstashExporter::new(host, *port)?; + Arc::new(logstash_exporter) } }; exporters.push(exporter); From 5712e3dc7851bec6a4c967aba053b46de3b76a17 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 14:28:18 +0900 Subject: [PATCH 2/3] fix: Address security issues in Logstash audit exporter --- Cargo.lock | 75 ++++++++++ Cargo.toml | 2 + src/server/audit/logstash.rs | 261 +++++++++++++++++++++++++++++++++-- 3 files changed, 329 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b761862..1112f8d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -482,6 +482,7 @@ dependencies = [ "regex", "rpassword", "russh-sftp", + "rustls-native-certs", "rustyline", "secrecy", "security-framework", @@ -497,6 +498,7 @@ dependencies = [ "terminal_size", "thiserror 2.0.17", "tokio", + "tokio-rustls", "tokio-test", "tokio-util", "tracing", @@ -2901,6 +2903,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "opentelemetry" version = "0.21.0" @@ -3870,6 +3878,54 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.23.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" +dependencies = [ + "aws-lc-rs", + "ring", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -3931,6 +3987,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4702,6 +4767,16 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" diff --git a/Cargo.toml b/Cargo.toml index a92b7709..8a4f10c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,8 @@ opentelemetry = "0.21" opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "logs"] } opentelemetry-otlp = { version = "0.14", features = ["grpc-tonic", "logs"] } url = "2.5" +tokio-rustls = "0.26" +rustls-native-certs = "0.8" [target.'cfg(target_os = "macos")'.dependencies] security-framework = "3.5.1" diff --git a/src/server/audit/logstash.rs b/src/server/audit/logstash.rs index 0284e2ca..f8214b5c 100644 --- a/src/server/audit/logstash.rs +++ b/src/server/audit/logstash.rs @@ -21,10 +21,17 @@ //! # Features //! //! - TCP connection with automatic reconnection on failure +//! - Optional TLS encryption for secure transmission //! - JSON Lines protocol (newline-delimited JSON) //! - Batch support for efficient event transmission //! - Connection timeout handling //! +//! # Security +//! +//! **WARNING**: By default, connections are unencrypted. For production use, +//! it is strongly recommended to enable TLS encryption using `with_tls(true)` +//! to protect sensitive audit data in transit. +//! //! # Example //! //! ```no_run @@ -33,7 +40,8 @@ //! use bssh::server::audit::event::{AuditEvent, EventType}; //! //! # async fn example() -> anyhow::Result<()> { -//! let exporter = LogstashExporter::new("logstash.example.com", 5044)?; +//! let exporter = LogstashExporter::new("logstash.example.com", 5044)? +//! .with_tls(true); // Enable TLS for secure transmission //! //! let event = AuditEvent::new( //! EventType::AuthSuccess, @@ -50,16 +58,58 @@ use super::event::AuditEvent; use super::exporter::AuditExporter; use anyhow::{Context, Result}; use async_trait::async_trait; +use std::net::IpAddr; +use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use tokio::sync::Mutex; +use tokio_rustls::rustls::pki_types::ServerName; +use tokio_rustls::rustls::{ClientConfig, RootCertStore}; +use tokio_rustls::TlsConnector; + +/// Represents a connection to the Logstash server, either plain TCP or TLS-encrypted. +enum Connection { + Plain(TcpStream), + Tls(Box>), +} + +impl Connection { + async fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { + match self { + Connection::Plain(stream) => stream.write_all(data).await, + Connection::Tls(stream) => stream.write_all(data).await, + } + } + + async fn flush(&mut self) -> std::io::Result<()> { + match self { + Connection::Plain(stream) => stream.flush().await, + Connection::Tls(stream) => stream.flush().await, + } + } +} /// Logstash audit exporter. /// /// Sends audit events to a Logstash server via TCP using JSON Lines protocol. /// The exporter automatically handles connection failures and reconnects /// as needed. +/// +/// # Security +/// +/// By default, connections are unencrypted. Use `with_tls(true)` to enable +/// TLS encryption for production environments. +/// +/// # Batch Size Considerations +/// +/// When using `export_batch()`, be aware that the entire batch is buffered +/// in memory before transmission. Large batches can consume significant memory. +/// Consider the following guidelines: +/// +/// - For typical audit events (~1KB each), batches of 100-1000 events are reasonable +/// - Monitor memory usage if processing larger events or higher batch sizes +/// - Adjust batch sizes based on your memory constraints and network latency requirements pub struct LogstashExporter { /// Logstash server hostname or IP address host: String, @@ -68,18 +118,27 @@ pub struct LogstashExporter { port: u16, /// TCP connection (wrapped in Mutex for interior mutability) - connection: Mutex>, + connection: Mutex>, /// Delay before attempting to reconnect after failure reconnect_delay: Duration, /// Connection timeout connect_timeout: Duration, + + /// Whether to use TLS encryption + use_tls: bool, + + /// TLS connector (only initialized if use_tls is true) + tls_connector: Option, } impl LogstashExporter { /// Create a new Logstash exporter. /// + /// **WARNING**: By default, connections are unencrypted. For production use, + /// call `with_tls(true)` to enable TLS encryption. + /// /// # Arguments /// /// * `host` - Logstash server hostname or IP address @@ -87,21 +146,137 @@ impl LogstashExporter { /// /// # Errors /// - /// Returns an error if the host or port are invalid. + /// Returns an error if the host is invalid (empty, or not a valid hostname/IP). pub fn new(host: &str, port: u16) -> Result { if host.is_empty() { anyhow::bail!("Logstash host cannot be empty"); } + // Validate host format (must be a valid hostname or IP address) + if !Self::is_valid_host(host) { + anyhow::bail!("Invalid host format: must be a valid hostname or IP address"); + } + + tracing::warn!( + "Logstash exporter created without TLS encryption. \ + For production use, enable TLS with with_tls(true) to protect audit data in transit." + ); + Ok(Self { host: host.to_string(), port, connection: Mutex::new(None), reconnect_delay: Duration::from_secs(5), connect_timeout: Duration::from_secs(10), + use_tls: false, + tls_connector: None, }) } + /// Enable or disable TLS encryption for the connection. + /// + /// When TLS is enabled, the exporter will use the system's root certificates + /// to validate the server's certificate. + /// + /// # Arguments + /// + /// * `enable` - Whether to enable TLS encryption + /// + /// # Example + /// + /// ```no_run + /// # use bssh::server::audit::logstash::LogstashExporter; + /// # fn example() -> anyhow::Result<()> { + /// let exporter = LogstashExporter::new("logstash.example.com", 5044)? + /// .with_tls(true); + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn with_tls(mut self, enable: bool) -> Self { + self.use_tls = enable; + if enable { + // Initialize TLS connector with system root certificates + let mut root_store = RootCertStore::empty(); + let cert_result = rustls_native_certs::load_native_certs(); + + for cert in cert_result.certs { + root_store.add(cert).ok(); + } + + if !cert_result.errors.is_empty() { + tracing::warn!( + "Some errors occurred while loading native certificates: {:?}", + cert_result.errors + ); + } + + let config = ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + self.tls_connector = Some(TlsConnector::from(Arc::new(config))); + tracing::info!("TLS encryption enabled for Logstash exporter"); + } else { + self.tls_connector = None; + tracing::warn!( + "TLS encryption disabled for Logstash exporter. \ + Audit data will be transmitted unencrypted." + ); + } + self + } + + /// Validate that the host string is a valid hostname or IP address. + /// + /// # Arguments + /// + /// * `host` - The host string to validate + /// + /// # Returns + /// + /// `true` if the host is a valid hostname or IP address, `false` otherwise. + fn is_valid_host(host: &str) -> bool { + // Try to parse as IP address first + if host.parse::().is_ok() { + return true; + } + + // Validate as hostname (RFC 1123) + // - Must be 1-253 characters + // - Each label must be 1-63 characters + // - Labels can contain alphanumeric characters and hyphens + // - Labels cannot start or end with a hyphen + // - Labels are separated by dots + if host.len() > 253 { + return false; + } + + let labels: Vec<&str> = host.split('.').collect(); + if labels.is_empty() { + return false; + } + + for label in labels { + if label.is_empty() || label.len() > 63 { + return false; + } + + // Check first and last characters + let chars: Vec = label.chars().collect(); + if chars[0] == '-' || chars[chars.len() - 1] == '-' { + return false; + } + + // Check that all characters are alphanumeric or hyphen + if !chars.iter().all(|c| c.is_ascii_alphanumeric() || *c == '-') { + return false; + } + } + + true + } + /// Ensure a connection to the Logstash server exists. /// /// If no connection exists, attempts to establish one. @@ -127,26 +302,49 @@ impl LogstashExporter { Ok(()) } - /// Establish a TCP connection to the Logstash server. + /// Establish a connection to the Logstash server (TCP or TLS). /// /// # Errors /// /// Returns an error if the connection times out or fails. - async fn connect(&self) -> Result { + async fn connect(&self) -> Result { let addr = format!("{}:{}", self.host, self.port); - let stream = tokio::time::timeout(self.connect_timeout, TcpStream::connect(&addr)) + let tcp_stream = tokio::time::timeout(self.connect_timeout, TcpStream::connect(&addr)) .await .context("Connection timeout")? .context("Failed to connect")?; - tracing::info!("Connected to Logstash at {}", addr); - Ok(stream) + let connection = if self.use_tls { + let connector = self + .tls_connector + .as_ref() + .ok_or_else(|| anyhow::anyhow!("TLS enabled but connector not initialized"))? + .clone(); + + let server_name = + ServerName::try_from(self.host.clone()).context("Invalid server name for TLS")?; + + let tls_stream = connector + .connect(server_name, tcp_stream) + .await + .context("TLS handshake failed")?; + + tracing::info!("Connected to Logstash at {} with TLS", addr); + Connection::Tls(Box::new(tls_stream)) + } else { + tracing::info!("Connected to Logstash at {} (unencrypted)", addr); + Connection::Plain(tcp_stream) + }; + + Ok(connection) } /// Send data to the Logstash server. /// /// If the connection is lost, attempts to reconnect once before failing. + /// The mutex is released during the reconnection delay to avoid blocking + /// other operations. /// /// # Arguments /// @@ -169,13 +367,21 @@ impl LogstashExporter { } } - // Connection lost or didn't exist, reconnect and retry + // Connection lost or didn't exist, drop the lock before sleeping + drop(conn); + + // Wait before reconnecting (without holding the lock) tokio::time::sleep(self.reconnect_delay).await; + + // Reconnect and retry let mut stream = self.connect().await?; stream .write_all(data) .await .context("Failed to write after reconnection")?; + + // Reacquire lock and store the connection + let mut conn = self.connection.lock().await; *conn = Some(stream); Ok(()) @@ -290,6 +496,43 @@ mod tests { assert!(exporter.is_err()); } + #[tokio::test] + async fn test_host_validation() { + // Valid hostnames + assert!(LogstashExporter::new("localhost", 5044).is_ok()); + assert!(LogstashExporter::new("logstash.example.com", 5044).is_ok()); + assert!(LogstashExporter::new("my-server-01.internal.example.com", 5044).is_ok()); + + // Valid IP addresses + assert!(LogstashExporter::new("127.0.0.1", 5044).is_ok()); + assert!(LogstashExporter::new("192.168.1.100", 5044).is_ok()); + assert!(LogstashExporter::new("::1", 5044).is_ok()); + assert!(LogstashExporter::new("2001:db8::1", 5044).is_ok()); + + // Invalid hostnames + assert!(LogstashExporter::new("", 5044).is_err()); + assert!(LogstashExporter::new("-invalid", 5044).is_err()); + assert!(LogstashExporter::new("invalid-", 5044).is_err()); + assert!(LogstashExporter::new("invalid..host", 5044).is_err()); + assert!(LogstashExporter::new("invalid host with spaces", 5044).is_err()); + assert!(LogstashExporter::new("invalid@host", 5044).is_err()); + } + + #[tokio::test] + async fn test_with_tls() { + let exporter = LogstashExporter::new("localhost", 5044) + .unwrap() + .with_tls(true); + assert!(exporter.use_tls); + assert!(exporter.tls_connector.is_some()); + + let exporter = LogstashExporter::new("localhost", 5044) + .unwrap() + .with_tls(false); + assert!(!exporter.use_tls); + assert!(exporter.tls_connector.is_none()); + } + #[tokio::test] async fn test_format_event() { let exporter = LogstashExporter::new("localhost", 5044).unwrap(); From e4e7e304835f558082a9965daf9a7aca0c9952b6 Mon Sep 17 00:00:00 2001 From: Jeongkyu Shin Date: Sat, 24 Jan 2026 14:36:28 +0900 Subject: [PATCH 3/3] docs: Update architecture documentation for Logstash exporter Mark LogstashExporter as implemented (no longer "planned") and add documentation with usage example. --- ARCHITECTURE.md | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index f9b1118a..f0099cfb 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -307,6 +307,14 @@ let exporter = FileExporter::new(Path::new("/var/log/audit.log"))? - Graceful shutdown and flush methods - TLS support for secure audit data transmission +- **LogstashExporter**: Logstash exporter for ELK stack integration + - TCP connection with JSON Lines protocol (newline-delimited JSON) + - Optional TLS encryption for secure transmission + - Automatic reconnection on connection failure + - Batch support for efficient event transmission + - Connection timeout handling (default: 10 seconds) + - Configurable host and port + **OtelExporter Usage**: ```rust use bssh::server::audit::otel::OtelExporter; @@ -328,8 +336,27 @@ exporter.export(event).await?; exporter.close().await?; ``` -**Future Exporters** (planned): -- Logstash exporter for centralized logging +**LogstashExporter Usage**: +```rust +use bssh::server::audit::logstash::LogstashExporter; +use bssh::server::audit::exporter::AuditExporter; +use bssh::server::audit::event::{AuditEvent, EventType}; + +// Create exporter (unencrypted by default) +let exporter = LogstashExporter::new("logstash.example.com", 5044)? + .with_tls(true); // Enable TLS for production + +// Export an audit event +let event = AuditEvent::new( + EventType::AuthSuccess, + "alice".to_string(), + "session-123".to_string(), +); +exporter.export(event).await?; + +// Graceful shutdown +exporter.close().await?; +``` ### Server CLI Binary **Binary**: `bssh-server`