diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 8cf64bae5..8f1c783c4 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -43,7 +43,10 @@ use bottlecap::{ listener::Listener as LifecycleListener, }, logger, - logs::{agent::LogsAgent, flusher::LogsFlusher}, + logs::{ + agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService, + flusher::LogsFlusher, + }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, proxy::{interceptor, should_start_proxy}, secrets::decrypt, @@ -904,13 +907,17 @@ fn start_logs_agent( tags_provider: &Arc, event_bus: Sender, ) -> (Sender, LogsFlusher) { - let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus); - let logs_agent_channel = logs_agent.get_sender_copy(); - let logs_flusher = LogsFlusher::new( - api_key_factory, - Arc::clone(&logs_agent.aggregator), - config.clone(), + let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::default(); + start_logs_aggregator(logs_aggr_service); + + let mut logs_agent = LogsAgent::new( + Arc::clone(tags_provider), + Arc::clone(config), + event_bus, + logs_aggr_handle.clone(), ); + let logs_agent_channel = logs_agent.get_sender_copy(); + let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, config.clone()); tokio::spawn(async move { logs_agent.spin().await; }); @@ -1084,6 +1091,12 @@ fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) { }); } +fn start_logs_aggregator(aggr_service: LogsAggregatorService) { + tokio::spawn(async move { + aggr_service.run().await; + }); +} + async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 1404e3532..acd613644 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -1,21 +1,18 @@ use std::sync::Arc; -use tokio::sync::{ - Mutex, - mpsc::{self, Sender}, -}; +use tokio::sync::mpsc::{self, Sender}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::logs::{aggregator::Aggregator, processor::LogsProcessor}; +use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor}; use crate::tags; use crate::{LAMBDA_RUNTIME_SLUG, config}; #[allow(clippy::module_name_repetitions)] pub struct LogsAgent { - pub aggregator: Arc>, tx: Sender, rx: mpsc::Receiver, processor: LogsProcessor, + aggregator_handle: AggregatorHandle, } impl LogsAgent { @@ -24,8 +21,8 @@ impl LogsAgent { tags_provider: Arc, datadog_config: Arc, event_bus: Sender, - ) -> LogsAgent { - let aggregator: Arc> = Arc::new(Mutex::new(Aggregator::default())); + aggregator_handle: AggregatorHandle, + ) -> Self { let processor = LogsProcessor::new( Arc::clone(&datadog_config), tags_provider, @@ -35,23 +32,25 @@ impl LogsAgent { let (tx, rx) = mpsc::channel::(1000); - LogsAgent { - aggregator, + Self { tx, rx, processor, + aggregator_handle, } } pub async fn spin(&mut self) { while let Some(event) = self.rx.recv().await { - self.processor.process(event, &self.aggregator).await; + self.processor.process(event, &self.aggregator_handle).await; } } pub async fn sync_consume(&mut self) { if let Some(events) = self.rx.recv().await { - self.processor.process(events, &self.aggregator).await; + self.processor + .process(events, &self.aggregator_handle) + .await; } } diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs new file mode 100644 index 000000000..d281bfb19 --- /dev/null +++ b/bottlecap/src/logs/aggregator_service.rs @@ -0,0 +1,150 @@ +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error}; + +use crate::logs::{aggregator::Aggregator, constants}; + +#[derive(Debug)] +pub enum AggregatorCommand { + InsertBatch(Vec), + GetBatches(oneshot::Sender>>), + Shutdown, +} + +#[derive(Clone, Debug)] +pub struct AggregatorHandle { + tx: mpsc::UnboundedSender, +} + +impl AggregatorHandle { + pub fn insert_batch( + &self, + logs: Vec, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::InsertBatch(logs)) + } + + pub async fn get_batches(&self) -> Result>, String> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::GetBatches(response_tx)) + .map_err(|e| format!("Failed to send flush command: {e}"))?; + + response_rx + .await + .map_err(|e| format!("Failed to receive flush response: {e}")) + } + + pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::Shutdown) + } +} + +pub struct AggregatorService { + aggregator: Aggregator, + rx: mpsc::UnboundedReceiver, +} + +impl AggregatorService { + #[must_use] + #[allow(clippy::should_implement_trait)] + pub fn default() -> (Self, AggregatorHandle) { + Self::new( + constants::MAX_BATCH_ENTRIES_SIZE, + constants::MAX_CONTENT_SIZE_BYTES, + constants::MAX_LOG_SIZE_BYTES, + ) + } + + #[must_use] + pub fn new( + max_batch_entries_size: usize, + max_content_size_bytes: usize, + max_log_size_bytes: usize, + ) -> (Self, AggregatorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let aggregator = Aggregator::new( + max_batch_entries_size, + max_content_size_bytes, + max_log_size_bytes, + ); + + let service = Self { aggregator, rx }; + let handle = AggregatorHandle { tx }; + + (service, handle) + } + + pub async fn run(mut self) { + debug!("Logs aggregator service started"); + + while let Some(command) = self.rx.recv().await { + match command { + AggregatorCommand::InsertBatch(logs) => { + self.aggregator.add_batch(logs); + } + AggregatorCommand::GetBatches(response_tx) => { + let mut batches = Vec::new(); + let mut current_batch = self.aggregator.get_batch(); + while !current_batch.is_empty() { + batches.push(current_batch); + current_batch = self.aggregator.get_batch(); + } + if response_tx.send(batches).is_err() { + error!("Failed to send logs flush response - receiver dropped"); + } + } + AggregatorCommand::Shutdown => { + debug!("Logs aggregator service shutting down"); + break; + } + } + } + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::logs::lambda::{IntakeLog, Lambda, Message}; + + #[tokio::test] + async fn test_aggregator_service_insert_and_flush() { + let (service, handle) = AggregatorService::default(); + + let service_handle = tokio::spawn(async move { + service.run().await; + }); + + let log = IntakeLog { + message: Message { + message: "test".to_string(), + lambda: Lambda { + arn: "arn".to_string(), + request_id: Some("request_id".to_string()), + }, + timestamp: 0, + status: "status".to_string(), + }, + hostname: "hostname".to_string(), + service: "service".to_string(), + tags: "tags".to_string(), + source: "source".to_string(), + }; + let serialized_log = serde_json::to_string(&log).unwrap(); + + handle.insert_batch(vec![serialized_log.clone()]).unwrap(); + + let batches = handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + let serialized_batch = format!("[{serialized_log}]"); + assert_eq!(batches[0], serialized_batch.as_bytes()); + + handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); + } +} diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index b776068b9..4a81722c7 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -1,7 +1,7 @@ use crate::FLUSH_RETRY_COUNT; use crate::config; use crate::http::get_client; -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use dogstatsd::api_key::ApiKeyFactory; use futures::future::join_all; use hyper::StatusCode; @@ -10,10 +10,7 @@ use std::error::Error; use std::time::Instant; use std::{io::Write, sync::Arc}; use thiserror::Error as ThisError; -use tokio::{ - sync::{Mutex, OnceCell}, - task::JoinSet, -}; +use tokio::{sync::OnceCell, task::JoinSet}; use tracing::{debug, error}; use zstd::stream::write::Encoder; @@ -28,24 +25,22 @@ pub struct FailedRequestError { pub struct Flusher { client: reqwest::Client, endpoint: String, - aggregator: Arc>, config: Arc, api_key_factory: Arc, headers: OnceCell, } impl Flusher { + #[must_use] pub fn new( api_key_factory: Arc, endpoint: String, - aggregator: Arc>, config: Arc, ) -> Self { let client = get_client(&config); Flusher { client, endpoint, - aggregator, config, api_key_factory, headers: OnceCell::new(), @@ -199,12 +194,13 @@ impl Flusher { pub struct LogsFlusher { config: Arc, pub flushers: Vec, + aggregator_handle: AggregatorHandle, } impl LogsFlusher { pub fn new( api_key_factory: Arc, - aggregator: Arc>, + aggregator_handle: AggregatorHandle, config: Arc, ) -> Self { let mut flushers = Vec::new(); @@ -222,22 +218,26 @@ impl LogsFlusher { flushers.push(Flusher::new( Arc::clone(&api_key_factory), endpoint, - aggregator.clone(), config.clone(), )); // Create flushers for additional endpoints for endpoint in &config.logs_config_additional_endpoints { let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); + let additional_api_key_factory = + Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); flushers.push(Flusher::new( - Arc::clone(&api_key_factory), + additional_api_key_factory, endpoint_url, - aggregator.clone(), config.clone(), )); } - LogsFlusher { config, flushers } + LogsFlusher { + config, + flushers, + aggregator_handle, + } } pub async fn flush( @@ -261,16 +261,17 @@ impl LogsFlusher { } } } else { - // Get batches from primary flusher's aggregator let logs_batches = Arc::new({ - let mut guard = self.flushers[0].aggregator.lock().await; - let mut batches = Vec::new(); - let mut current_batch = guard.get_batch(); - while !current_batch.is_empty() { - batches.push(self.compress(current_batch)); - current_batch = guard.get_batch(); + match self.aggregator_handle.get_batches().await { + Ok(batches) => batches + .into_iter() + .map(|batch| self.compress(batch)) + .collect(), + Err(e) => { + debug!("Failed to flush from aggregator: {}", e); + Vec::new() + } } - batches }); // Send batches to each flusher diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index d96a902da..279e06ef0 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::sync::Arc; -use tokio::sync::{Mutex, mpsc::Sender}; +use tokio::sync::mpsc::Sender; use tracing::{debug, error}; @@ -9,7 +9,7 @@ use crate::config; use crate::event_bus::Event; use crate::extension::telemetry::events::{Status, TelemetryEvent, TelemetryRecord}; use crate::lifecycle::invocation::context::Context as InvocationContext; -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; @@ -324,7 +324,7 @@ impl LambdaProcessor { } } - pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc>) { + pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) { if let Ok(mut log) = self.make_log(event).await { let should_send_log = self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); @@ -350,11 +350,11 @@ impl LambdaProcessor { } } - if self.ready_logs.is_empty() { - return; + if !self.ready_logs.is_empty() { + if let Err(e) = aggregator_handle.insert_batch(std::mem::take(&mut self.ready_logs)) { + debug!("Failed to send logs to aggregator: {}", e); + } } - let mut aggregator = aggregator.lock().await; - aggregator.add_batch(std::mem::take(&mut self.ready_logs)); } } @@ -366,10 +366,12 @@ mod tests { use chrono::{TimeZone, Utc}; use serde_json::{Number, Value}; use std::collections::hash_map::HashMap; + use std::sync::Arc; use crate::extension::telemetry::events::{ InitPhase, InitType, ReportMetrics, RuntimeDoneMetrics, Status, }; + use crate::logs::aggregator_service::AggregatorService; use crate::logs::lambda::Lambda; macro_rules! get_message_tests { @@ -724,7 +726,6 @@ mod tests { // process #[tokio::test] async fn test_process() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), @@ -738,6 +739,11 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); let mut processor = LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); @@ -750,10 +756,11 @@ mod tests { }, }; - processor.process(event.clone(), &aggregator).await; + processor.process(event.clone(), &aggregator_handle).await; + + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); let log = IntakeLog { message: Message { message: "START RequestId: test-request-id Version: test".to_string(), @@ -770,12 +777,18 @@ mod tests { tags: tags_provider.get_tags_string(), }; let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); - assert_eq!(batch, serialized_log.as_bytes()); + assert_eq!(batches[0], serialized_log.as_bytes()); + + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] async fn test_process_logs_disabled() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), @@ -791,6 +804,11 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + let mut processor = LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); @@ -802,16 +820,21 @@ mod tests { }, }; - processor.process(event.clone(), &aggregator).await; + processor.process(event.clone(), &aggregator_handle).await; - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - assert_eq!(batch.len(), 0); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert!(batches.is_empty()); + + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] async fn test_process_log_with_no_request_id() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), @@ -826,6 +849,11 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let event = TelemetryEvent { @@ -833,17 +861,22 @@ mod tests { record: TelemetryRecord::Function(Value::String("test-function".to_string())), }; - processor.process(event.clone(), &aggregator).await; + processor.process(event.clone(), &aggregator_handle).await; assert_eq!(processor.orphan_logs.len(), 1); - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - assert!(batch.is_empty()); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert!(batches.is_empty()); + + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] async fn test_process_logs_after_seeing_request_id() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), @@ -858,6 +891,11 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + let mut processor = LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); @@ -869,7 +907,9 @@ mod tests { }, }; - processor.process(start_event.clone(), &aggregator).await; + processor + .process(start_event.clone(), &aggregator_handle) + .await; assert_eq!( processor.invocation_context.request_id, "test-request-id".to_string() @@ -881,11 +921,11 @@ mod tests { record: TelemetryRecord::Function(Value::String("test-function".to_string())), }; - processor.process(event.clone(), &aggregator).await; + processor.process(event.clone(), &aggregator_handle).await; + + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); - // Verify aggregator logs - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); let start_log = IntakeLog { message: Message { message: "START RequestId: test-request-id Version: test".to_string(), @@ -921,7 +961,14 @@ mod tests { serde_json::to_string(&start_log).unwrap(), serde_json::to_string(&function_log).unwrap() ); - assert_eq!(batch, serialized_log.as_bytes()); + assert_eq!(batches[0], serialized_log.as_bytes()); + + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] @@ -939,6 +986,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { @@ -996,6 +1044,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let mut processor = LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { diff --git a/bottlecap/src/logs/mod.rs b/bottlecap/src/logs/mod.rs index c46dfffc6..3c40f2983 100644 --- a/bottlecap/src/logs/mod.rs +++ b/bottlecap/src/logs/mod.rs @@ -1,5 +1,6 @@ pub mod agent; pub mod aggregator; +pub mod aggregator_service; pub mod constants; pub mod flusher; pub mod lambda; diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index 771d71b47..cc3d220af 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use tokio::sync::{Mutex, mpsc::Sender}; +use tokio::sync::mpsc::Sender; use tracing::debug; @@ -7,10 +7,9 @@ use crate::LAMBDA_RUNTIME_SLUG; use crate::config::{self, processing_rule}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::tags; - -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::lambda::processor::LambdaProcessor; +use crate::tags; impl LogsProcessor { #[must_use] @@ -29,10 +28,10 @@ impl LogsProcessor { } } - pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc>) { + pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) { match self { LogsProcessor::Lambda(lambda_processor) => { - lambda_processor.process(event, aggregator).await; + lambda_processor.process(event, aggregator_handle).await; } } } diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index f1bb1a4d6..3100322b2 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -55,14 +55,21 @@ async fn test_logs() { )); let bus = EventBus::run(); - let mut logs_agent = - LogsAgent::new(tags_provider, Arc::clone(&arc_conf), bus.get_sender_copy()); - let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); - let logs_flusher = LogsFlusher::new( - api_key_factory, - Arc::clone(&logs_agent.aggregator), - arc_conf.clone(), + + let (logs_aggr_service, logs_aggr_handle) = + bottlecap::logs::aggregator_service::AggregatorService::default(); + tokio::spawn(async move { + logs_aggr_service.run().await; + }); + + let mut logs_agent = LogsAgent::new( + tags_provider, + Arc::clone(&arc_conf), + bus.get_sender_copy(), + logs_aggr_handle.clone(), ); + let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); + let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone()); let telemetry_events: Vec = serde_json::from_str( r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#)