From 129ade8e36aa51a8c4d014ded71aa60918b3c68d Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 10:44:21 -0400 Subject: [PATCH 01/11] send logs using channel --- bottlecap/src/bin/bottlecap/main.rs | 25 +- bottlecap/src/logs/agent.rs | 20 +- bottlecap/src/logs/aggregator_service.rs | 210 +++++++ bottlecap/src/logs/flusher.rs | 46 +- bottlecap/src/logs/lambda/processor.rs | 671 ++++++++++++----------- bottlecap/src/logs/mod.rs | 1 + bottlecap/src/logs/processor.rs | 11 +- bottlecap/tests/logs_integration_test.rs | 5 +- 8 files changed, 621 insertions(+), 368 deletions(-) create mode 100644 bottlecap/src/logs/aggregator_service.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 5028b2194..6edc09394 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -43,7 +43,7 @@ use bottlecap::{ listener::Listener as LifecycleListener, }, logger, - logs::{agent::LogsAgent, flusher::LogsFlusher}, + logs::{agent::LogsAgent, aggregator_service::{AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService}, flusher::LogsFlusher}, metrics::enhanced::lambda::Lambda as enhanced_metrics, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, proxy::{interceptor, should_start_proxy}, @@ -367,11 +367,23 @@ async fn extension_loop_active( .to_string(); let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id); + let logs_aggr_init_start_time = Instant::now(); + let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::new_default(); + debug!( + "Logs aggregator created in {:} microseconds", + logs_aggr_init_start_time + .elapsed() + .as_micros() + .to_string() + ); + start_logs_aggregator(logs_aggr_service); + let (logs_agent_channel, logs_flusher) = start_logs_agent( config, Arc::clone(&api_key_factory), &tags_provider, event_bus.get_sender_copy(), + logs_aggr_handle, ); let metrics_aggr_init_start_time = Instant::now(); @@ -904,12 +916,13 @@ fn start_logs_agent( api_key_factory: Arc, tags_provider: &Arc, event_bus: Sender, + logs_aggr_handle: LogsAggregatorHandle, ) -> (Sender, LogsFlusher) { - let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus); + let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus, logs_aggr_handle.clone()); let logs_agent_channel = logs_agent.get_sender_copy(); let logs_flusher = LogsFlusher::new( api_key_factory, - Arc::clone(&logs_agent.aggregator), + logs_aggr_handle, config.clone(), ); tokio::spawn(async move { @@ -1107,6 +1120,12 @@ fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) { }); } +fn start_logs_aggregator(aggr_service: LogsAggregatorService) { + tokio::spawn(async move { + aggr_service.run().await; + }); +} + async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 1404e3532..fa4965c5c 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -1,18 +1,14 @@ use std::sync::Arc; -use tokio::sync::{ - Mutex, - mpsc::{self, Sender}, -}; +use tokio::sync::mpsc::{self, Sender}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::logs::{aggregator::Aggregator, processor::LogsProcessor}; +use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor}; use crate::tags; use crate::{LAMBDA_RUNTIME_SLUG, config}; #[allow(clippy::module_name_repetitions)] pub struct LogsAgent { - pub aggregator: Arc>, tx: Sender, rx: mpsc::Receiver, processor: LogsProcessor, @@ -24,19 +20,19 @@ impl LogsAgent { tags_provider: Arc, datadog_config: Arc, event_bus: Sender, - ) -> LogsAgent { - let aggregator: Arc> = Arc::new(Mutex::new(Aggregator::default())); + aggregator_handle: AggregatorHandle, + ) -> Self { let processor = LogsProcessor::new( Arc::clone(&datadog_config), tags_provider, event_bus, LAMBDA_RUNTIME_SLUG.to_string(), + aggregator_handle.clone(), ); let (tx, rx) = mpsc::channel::(1000); - LogsAgent { - aggregator, + Self { tx, rx, processor, @@ -45,13 +41,13 @@ impl LogsAgent { pub async fn spin(&mut self) { while let Some(event) = self.rx.recv().await { - self.processor.process(event, &self.aggregator).await; + self.processor.process(event).await; } } pub async fn sync_consume(&mut self) { if let Some(events) = self.rx.recv().await { - self.processor.process(events, &self.aggregator).await; + self.processor.process(events).await; } } diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs new file mode 100644 index 000000000..8f7f2fcfc --- /dev/null +++ b/bottlecap/src/logs/aggregator_service.rs @@ -0,0 +1,210 @@ +use tokio::sync::{mpsc, oneshot}; +use tracing::debug; + +use crate::logs::aggregator::Aggregator; + +#[derive(Debug)] +pub enum AggregatorCommand { + InsertBatch(Vec), + Flush(oneshot::Sender>>), + Shutdown, +} + +#[derive(Clone, Debug)] +pub struct AggregatorHandle { + tx: mpsc::UnboundedSender, +} + +impl AggregatorHandle { + pub fn insert_batch( + &self, + logs: Vec, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::InsertBatch(logs)) + } + + pub async fn flush(&self) -> Result>, String> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::Flush(response_tx)) + .map_err(|e| format!("Failed to send flush command: {}", e))?; + + response_rx + .await + .map_err(|e| format!("Failed to receive flush response: {}", e)) + } + + pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::Shutdown) + } +} + +pub struct AggregatorService { + aggregator: Aggregator, + rx: mpsc::UnboundedReceiver, +} + +impl AggregatorService { + pub fn new( + max_batch_entries_size: usize, + max_content_size_bytes: usize, + max_log_size_bytes: usize, + ) -> (Self, AggregatorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let aggregator = Aggregator::new( + max_batch_entries_size, + max_content_size_bytes, + max_log_size_bytes, + ); + + let service = Self { aggregator, rx }; + let handle = AggregatorHandle { tx }; + + (service, handle) + } + + pub fn new_default() -> (Self, AggregatorHandle) { + use crate::logs::constants; + Self::new( + constants::MAX_BATCH_ENTRIES_SIZE, + constants::MAX_CONTENT_SIZE_BYTES, + constants::MAX_LOG_SIZE_BYTES, + ) + } + + pub async fn run(mut self) { + debug!("Logs aggregator service started"); + + while let Some(command) = self.rx.recv().await { + match command { + AggregatorCommand::InsertBatch(logs) => { + self.aggregator.add_batch(logs); + } + AggregatorCommand::Flush(response_tx) => { + let mut batches = Vec::new(); + loop { + let batch = self.aggregator.get_batch(); + if batch.is_empty() { + break; + } + batches.push(batch); + } + let _ = response_tx.send(batches); + } + AggregatorCommand::Shutdown => { + debug!("Logs aggregator service shutting down"); + break; + } + } + } + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::logs::lambda::{IntakeLog, Lambda, Message}; + + #[tokio::test] + async fn test_aggregator_service_insert_and_flush() { + let (mut service, handle) = AggregatorService::new_default(); + + // Spawn the service + let service_handle = tokio::spawn(async move { + service.run().await; + }); + + let log = IntakeLog { + message: Message { + message: "test".to_string(), + lambda: Lambda { + arn: "arn".to_string(), + request_id: Some("request_id".to_string()), + }, + timestamp: 0, + status: "status".to_string(), + }, + hostname: "hostname".to_string(), + service: "service".to_string(), + tags: "tags".to_string(), + source: "source".to_string(), + }; + let serialized_log = serde_json::to_string(&log).unwrap(); + + // Insert logs using handle + handle.insert_batch(vec![serialized_log.clone()]).unwrap(); + + // Flush all batches + let batches = handle.flush().await.unwrap(); + assert_eq!(batches.len(), 1); + let serialized_batch = format!("[{}]", serialized_log); + assert_eq!(batches[0], serialized_batch.as_bytes()); + + // Shutdown the service + handle.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_aggregator_service_multiple_handles() { + let (mut service, handle1) = AggregatorService::new_default(); + let handle2 = handle1.clone(); + + // Spawn the service + let service_handle = tokio::spawn(async move { + service.run().await; + }); + + let log = IntakeLog { + message: Message { + message: "test".to_string(), + lambda: Lambda { + arn: "arn".to_string(), + request_id: Some("request_id".to_string()), + }, + timestamp: 0, + status: "status".to_string(), + }, + hostname: "hostname".to_string(), + service: "service".to_string(), + tags: "tags".to_string(), + source: "source".to_string(), + }; + let serialized_log = serde_json::to_string(&log).unwrap(); + + // Send logs using both handles + handle1.insert_batch(vec![serialized_log.clone()]).unwrap(); + handle2.insert_batch(vec![serialized_log.clone()]).unwrap(); + + // Flush all batches + let batches = handle1.flush().await.unwrap(); + assert_eq!(batches.len(), 2); + + let serialized_batch = format!("[{}]", serialized_log); + assert_eq!(batches[0], serialized_batch.as_bytes()); + assert_eq!(batches[1], serialized_batch.as_bytes()); + + // Shutdown the service + handle1.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_aggregator_service_empty_flush() { + let (mut service, handle) = AggregatorService::new_default(); + + // Spawn the service + let service_handle = tokio::spawn(async move { + service.run().await; + }); + + // Flush when no logs are inserted + let batches = handle.flush().await.unwrap(); + assert!(batches.is_empty()); + + // Shutdown the service + handle.shutdown().unwrap(); + let _ = service_handle.await; + } +} diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index b776068b9..bed33b582 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -1,7 +1,7 @@ use crate::FLUSH_RETRY_COUNT; use crate::config; use crate::http::get_client; -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use dogstatsd::api_key::ApiKeyFactory; use futures::future::join_all; use hyper::StatusCode; @@ -11,7 +11,7 @@ use std::time::Instant; use std::{io::Write, sync::Arc}; use thiserror::Error as ThisError; use tokio::{ - sync::{Mutex, OnceCell}, + sync::OnceCell, task::JoinSet, }; use tracing::{debug, error}; @@ -28,7 +28,6 @@ pub struct FailedRequestError { pub struct Flusher { client: reqwest::Client, endpoint: String, - aggregator: Arc>, config: Arc, api_key_factory: Arc, headers: OnceCell, @@ -38,14 +37,12 @@ impl Flusher { pub fn new( api_key_factory: Arc, endpoint: String, - aggregator: Arc>, config: Arc, ) -> Self { let client = get_client(&config); Flusher { client, endpoint, - aggregator, config, api_key_factory, headers: OnceCell::new(), @@ -53,6 +50,7 @@ impl Flusher { } pub async fn flush(&self, batches: Option>>>) -> Vec { + debug!("=== Flushing batches to endpoint: {} ===", self.endpoint); let Some(api_key) = self.api_key_factory.get_api_key().await else { error!("Skipping flushing logs: Failed to resolve API key"); return vec![]; @@ -199,12 +197,13 @@ impl Flusher { pub struct LogsFlusher { config: Arc, pub flushers: Vec, + aggregator_handle: AggregatorHandle, } impl LogsFlusher { pub fn new( api_key_factory: Arc, - aggregator: Arc>, + aggregator_handle: AggregatorHandle, config: Arc, ) -> Self { let mut flushers = Vec::new(); @@ -222,22 +221,28 @@ impl LogsFlusher { flushers.push(Flusher::new( Arc::clone(&api_key_factory), endpoint, - aggregator.clone(), config.clone(), )); // Create flushers for additional endpoints + debug!("=== Creating {} additional log flushers for dual shipping ===", config.logs_config_additional_endpoints.len()); for endpoint in &config.logs_config_additional_endpoints { let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); + debug!("=== Creating additional log flusher for endpoint: {}", endpoint_url); + // Create a separate API key factory for this endpoint using its specific API key + let additional_api_key_factory = Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); flushers.push(Flusher::new( - Arc::clone(&api_key_factory), + additional_api_key_factory, endpoint_url, - aggregator.clone(), config.clone(), )); } - LogsFlusher { config, flushers } + LogsFlusher { + config, + flushers, + aggregator_handle, + } } pub async fn flush( @@ -261,23 +266,24 @@ impl LogsFlusher { } } } else { - // Get batches from primary flusher's aggregator let logs_batches = Arc::new({ - let mut guard = self.flushers[0].aggregator.lock().await; - let mut batches = Vec::new(); - let mut current_batch = guard.get_batch(); - while !current_batch.is_empty() { - batches.push(self.compress(current_batch)); - current_batch = guard.get_batch(); + match self.aggregator_handle.flush().await { + Ok(batches) => batches.into_iter().map(|batch| self.compress(batch)).collect(), + Err(e) => { + debug!("Failed to flush from aggregator: {}", e); + Vec::new() + } } - batches }); - // Send batches to each flusher + // Send batches to each flusher (dual shipping) let futures = self.flushers.iter().map(|flusher| { let batches = Arc::clone(&logs_batches); let flusher = flusher.clone(); - async move { flusher.flush(Some(batches)).await } + async move { + debug!("=== Flusher for endpoint {} processing {} batches ===", flusher.endpoint, batches.len()); + flusher.flush(Some(batches)).await + } }); let results = join_all(futures).await; diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index d96a902da..4d9c31f02 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::sync::Arc; -use tokio::sync::{Mutex, mpsc::Sender}; +use tokio::sync::mpsc::Sender; use tracing::{debug, error}; @@ -9,7 +9,7 @@ use crate::config; use crate::event_bus::Event; use crate::extension::telemetry::events::{Status, TelemetryEvent, TelemetryRecord}; use crate::lifecycle::invocation::context::Context as InvocationContext; -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; @@ -31,6 +31,8 @@ pub struct LambdaProcessor { ready_logs: Vec, // Main event bus event_bus: Sender, + // Aggregator handle for sending logs + aggregator_handle: AggregatorHandle, // Logs enabled logs_enabled: bool, } @@ -59,6 +61,7 @@ impl LambdaProcessor { tags_provider: Arc, datadog_config: Arc, event_bus: Sender, + aggregator_handle: AggregatorHandle, ) -> Self { let service = datadog_config.service.clone().unwrap_or_default(); let tags = tags_provider.get_tags_string(); @@ -77,6 +80,7 @@ impl LambdaProcessor { orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, + aggregator_handle, } } @@ -324,7 +328,7 @@ impl LambdaProcessor { } } - pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc>) { + pub async fn process(&mut self, event: TelemetryEvent) { if let Ok(mut log) = self.make_log(event).await { let should_send_log = self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); @@ -350,11 +354,12 @@ impl LambdaProcessor { } } - if self.ready_logs.is_empty() { - return; + if !self.ready_logs.is_empty() { + // Send logs to aggregator via handle + if let Err(e) = self.aggregator_handle.insert_batch(std::mem::take(&mut self.ready_logs)) { + debug!("Failed to send logs to aggregator: {}", e); + } } - let mut aggregator = aggregator.lock().await; - aggregator.add_batch(std::mem::take(&mut self.ready_logs)); } } @@ -366,10 +371,12 @@ mod tests { use chrono::{TimeZone, Utc}; use serde_json::{Number, Value}; use std::collections::hash_map::HashMap; + use std::sync::Arc; use crate::extension::telemetry::events::{ InitPhase, InitType, ReportMetrics, RuntimeDoneMetrics, Status, }; + use crate::logs::aggregator_service::AggregatorService; use crate::logs::lambda::Lambda; macro_rules! get_message_tests { @@ -392,6 +399,7 @@ mod tests { &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]))); let (tx, _) = tokio::sync::mpsc::channel(2); + let (_, aggregator_handle) = AggregatorService::new_default(); let mut processor = LambdaProcessor::new( tags_provider, @@ -400,6 +408,7 @@ mod tests { tags, ..config::Config::default()}), tx.clone(), + aggregator_handle, ); let result = processor.get_message(input.clone()).await.unwrap(); @@ -581,8 +590,9 @@ mod tests { )); let (tx, _) = tokio::sync::mpsc::channel(2); + let (_, aggregator_handle) = AggregatorService::new_default(); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -615,7 +625,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let (_, aggregator_handle) = AggregatorService::new_default(); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -661,7 +672,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let (_, aggregator_handle) = AggregatorService::new_default(); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -694,7 +706,8 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); + let (_, aggregator_handle) = AggregatorService::new_default(); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -721,319 +734,325 @@ mod tests { ); } - // process - #[tokio::test] - async fn test_process() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); - - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::PlatformStart { - request_id: "test-request-id".to_string(), - version: Some("test".to_string()), - }, - }; - - processor.process(event.clone(), &aggregator).await; - - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - let log = IntakeLog { - message: Message { - message: "START RequestId: test-request-id Version: test".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); - assert_eq!(batch, serialized_log.as_bytes()); - } - - #[tokio::test] - async fn test_process_logs_disabled() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - serverless_logs_enabled: false, - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); - - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::PlatformStart { - request_id: "test-request-id".to_string(), - version: Some("test".to_string()), - }, - }; - - processor.process(event.clone(), &aggregator).await; - - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - assert_eq!(batch.len(), 0); - } - - #[tokio::test] - async fn test_process_log_with_no_request_id() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); - - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::Function(Value::String("test-function".to_string())), - }; - - processor.process(event.clone(), &aggregator).await; - assert_eq!(processor.orphan_logs.len(), 1); - - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - assert!(batch.is_empty()); - } - - #[tokio::test] - async fn test_process_logs_after_seeing_request_id() { - let aggregator = Arc::new(Mutex::new(Aggregator::default())); - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - - let mut processor = - LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); - - let start_event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::PlatformStart { - request_id: "test-request-id".to_string(), - version: Some("test".to_string()), - }, - }; - - processor.process(start_event.clone(), &aggregator).await; - assert_eq!( - processor.invocation_context.request_id, - "test-request-id".to_string() - ); - - // This could be any event that doesn't have a `request_id` - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::Function(Value::String("test-function".to_string())), - }; - - processor.process(event.clone(), &aggregator).await; - - // Verify aggregator logs - let mut aggregator_lock = aggregator.lock().await; - let batch = aggregator_lock.get_batch(); - let start_log = IntakeLog { - message: Message { - message: "START RequestId: test-request-id Version: test".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let function_log = IntakeLog { - message: Message { - message: "test-function".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let serialized_log = format!( - "[{},{}]", - serde_json::to_string(&start_log).unwrap(), - serde_json::to_string(&function_log).unwrap() - ); - assert_eq!(batch, serialized_log.as_bytes()); - } - - #[tokio::test] - async fn test_process_logs_structured_ddtags() { - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = - LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); - let start_event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::PlatformStart { - request_id: "test-request-id".to_string(), - version: Some("test".to_string()), - }, - }; - - let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - processor.get_intake_log(start_lambda_message).unwrap(); - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details": "my-structured-message","ddtags":"added_tag1:added_value1,added_tag2:added_value2"}}"#.to_string())), - }; - - let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - assert!(intake_log.tags.contains("added_tag1:added_value1")); - let function_log = IntakeLog { - message: Message { - message: r#"{"custom_details":"my-structured-message"}"#.to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string() - + ",added_tag1:added_value1,added_tag2:added_value2", - }; - assert_eq!(intake_log, function_log); - } - #[tokio::test] - async fn test_process_logs_structured_no_ddtags() { - let config = Arc::new(config::Config { - service: Some("test-service".to_string()), - tags: HashMap::from([("test".to_string(), "tags".to_string())]), - ..config::Config::default() - }); - - let tags_provider = Arc::new(provider::Provider::new( - Arc::clone(&config), - LAMBDA_RUNTIME_SLUG.to_string(), - &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - )); - - let (tx, _rx) = tokio::sync::mpsc::channel(2); - let mut processor = - LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); - let start_event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::PlatformStart { - request_id: "test-request-id".to_string(), - version: Some("test".to_string()), - }, - }; - - let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - processor.get_intake_log(start_lambda_message).unwrap(); - let event = TelemetryEvent { - time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string())), - }; - - let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - let function_log = IntakeLog { - message: Message { - message: r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - assert_eq!(intake_log, function_log); - } + // // process + // #[tokio::test] + // async fn test_process() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + + // let mut processor = + // { + // let (_, aggregator_handle) = AggregatorService::new_default(); + // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) + // }; + + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::PlatformStart { + // request_id: "test-request-id".to_string(), + // version: Some("test".to_string()), + // }, + // }; + + // processor.process(event.clone()).await; + + // let mut aggregator_lock = aggregator.lock().await; + // let batch = aggregator_lock.get_batch(); + // let log = IntakeLog { + // message: Message { + // message: "START RequestId: test-request-id Version: test".to_string(), + // lambda: Lambda { + // arn: "test-arn".to_string(), + // request_id: Some("test-request-id".to_string()), + // }, + // timestamp: 1_673_061_827_000, + // status: "info".to_string(), + // }, + // hostname: "test-arn".to_string(), + // source: LAMBDA_RUNTIME_SLUG.to_string(), + // service: "test-service".to_string(), + // tags: tags_provider.get_tags_string(), + // }; + // let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); + // assert_eq!(batch, serialized_log.as_bytes()); + // } + + // #[tokio::test] + // async fn test_process_logs_disabled() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // serverless_logs_enabled: false, + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + + // let mut processor = + // { + // let (_, aggregator_handle) = AggregatorService::new_default(); + // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) + // }; + + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::PlatformStart { + // request_id: "test-request-id".to_string(), + // version: Some("test".to_string()), + // }, + // }; + + // processor.process(event.clone()).await; + + // let mut aggregator_lock = aggregator.lock().await; + // let batch = aggregator_lock.get_batch(); + // assert_eq!(batch.len(), 0); + // } + + // #[tokio::test] + // async fn test_process_log_with_no_request_id() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + // let (_, aggregator_handle) = AggregatorService::new_default(); + + // let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); + + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::Function(Value::String("test-function".to_string())), + // }; + + // processor.process(event.clone()).await; + // assert_eq!(processor.orphan_logs.len(), 1); + + // let mut aggregator_lock = aggregator.lock().await; + // let batch = aggregator_lock.get_batch(); + // assert!(batch.is_empty()); + // } + + // #[tokio::test] + // async fn test_process_logs_after_seeing_request_id() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + + // let mut processor = + // { + // let (_, aggregator_handle) = AggregatorService::new_default(); + // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) + // }; + + // let start_event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::PlatformStart { + // request_id: "test-request-id".to_string(), + // version: Some("test".to_string()), + // }, + // }; + + // processor.process(start_event.clone()).await; + // assert_eq!( + // processor.invocation_context.request_id, + // "test-request-id".to_string() + // ); + + // // This could be any event that doesn't have a `request_id` + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::Function(Value::String("test-function".to_string())), + // }; + + // processor.process(event.clone()).await; + + // // Verify aggregator logs + // let mut aggregator_lock = aggregator.lock().await; + // let batch = aggregator_lock.get_batch(); + // let start_log = IntakeLog { + // message: Message { + // message: "START RequestId: test-request-id Version: test".to_string(), + // lambda: Lambda { + // arn: "test-arn".to_string(), + // request_id: Some("test-request-id".to_string()), + // }, + // timestamp: 1_673_061_827_000, + // status: "info".to_string(), + // }, + // hostname: "test-arn".to_string(), + // source: LAMBDA_RUNTIME_SLUG.to_string(), + // service: "test-service".to_string(), + // tags: tags_provider.get_tags_string(), + // }; + // let function_log = IntakeLog { + // message: Message { + // message: "test-function".to_string(), + // lambda: Lambda { + // arn: "test-arn".to_string(), + // request_id: Some("test-request-id".to_string()), + // }, + // timestamp: 1_673_061_827_000, + // status: "info".to_string(), + // }, + // hostname: "test-arn".to_string(), + // source: LAMBDA_RUNTIME_SLUG.to_string(), + // service: "test-service".to_string(), + // tags: tags_provider.get_tags_string(), + // }; + // let serialized_log = format!( + // "[{},{}]", + // serde_json::to_string(&start_log).unwrap(), + // serde_json::to_string(&function_log).unwrap() + // ); + // assert_eq!(batch, serialized_log.as_bytes()); + // } + + // #[tokio::test] + // async fn test_process_logs_structured_ddtags() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + // let mut processor = + // LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); + // let start_event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::PlatformStart { + // request_id: "test-request-id".to_string(), + // version: Some("test".to_string()), + // }, + // }; + + // let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); + // processor.get_intake_log(start_lambda_message).unwrap(); + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details": "my-structured-message","ddtags":"added_tag1:added_value1,added_tag2:added_value2"}}"#.to_string())), + // }; + + // let lambda_message = processor.get_message(event.clone()).await.unwrap(); + // let intake_log = processor.get_intake_log(lambda_message).unwrap(); + + // assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); + // assert_eq!(intake_log.hostname, "test-arn".to_string()); + // assert_eq!(intake_log.service, "test-service".to_string()); + // assert!(intake_log.tags.contains("added_tag1:added_value1")); + // let function_log = IntakeLog { + // message: Message { + // message: r#"{"custom_details":"my-structured-message"}"#.to_string(), + // lambda: Lambda { + // arn: "test-arn".to_string(), + // request_id: Some("test-request-id".to_string()), + // }, + // timestamp: 1_673_061_827_000, + // status: "info".to_string(), + // }, + // hostname: "test-arn".to_string(), + // source: LAMBDA_RUNTIME_SLUG.to_string(), + // service: "test-service".to_string(), + // tags: tags_provider.get_tags_string() + // + ",added_tag1:added_value1,added_tag2:added_value2", + // }; + // assert_eq!(intake_log, function_log); + // } + // #[tokio::test] + // async fn test_process_logs_structured_no_ddtags() { + // let config = Arc::new(config::Config { + // service: Some("test-service".to_string()), + // tags: HashMap::from([("test".to_string(), "tags".to_string())]), + // ..config::Config::default() + // }); + + // let tags_provider = Arc::new(provider::Provider::new( + // Arc::clone(&config), + // LAMBDA_RUNTIME_SLUG.to_string(), + // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + // )); + + // let (tx, _rx) = tokio::sync::mpsc::channel(2); + // let mut processor = + // LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); + // let start_event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::PlatformStart { + // request_id: "test-request-id".to_string(), + // version: Some("test".to_string()), + // }, + // }; + + // let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); + // processor.get_intake_log(start_lambda_message).unwrap(); + // let event = TelemetryEvent { + // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + // record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string())), + // }; + + // let lambda_message = processor.get_message(event.clone()).await.unwrap(); + // let intake_log = processor.get_intake_log(lambda_message).unwrap(); + + // assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); + // assert_eq!(intake_log.hostname, "test-arn".to_string()); + // assert_eq!(intake_log.service, "test-service".to_string()); + // let function_log = IntakeLog { + // message: Message { + // message: r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string(), + // lambda: Lambda { + // arn: "test-arn".to_string(), + // request_id: Some("test-request-id".to_string()), + // }, + // timestamp: 1_673_061_827_000, + // status: "info".to_string(), + // }, + // hostname: "test-arn".to_string(), + // source: LAMBDA_RUNTIME_SLUG.to_string(), + // service: "test-service".to_string(), + // tags: tags_provider.get_tags_string(), + // }; + // assert_eq!(intake_log, function_log); + // } } diff --git a/bottlecap/src/logs/mod.rs b/bottlecap/src/logs/mod.rs index c46dfffc6..3c40f2983 100644 --- a/bottlecap/src/logs/mod.rs +++ b/bottlecap/src/logs/mod.rs @@ -1,5 +1,6 @@ pub mod agent; pub mod aggregator; +pub mod aggregator_service; pub mod constants; pub mod flusher; pub mod lambda; diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index 771d71b47..3c5e42b5b 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use tokio::sync::{Mutex, mpsc::Sender}; +use tokio::sync::mpsc::Sender; use tracing::debug; @@ -9,7 +9,7 @@ use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; use crate::tags; -use crate::logs::aggregator::Aggregator; +use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::lambda::processor::LambdaProcessor; impl LogsProcessor { @@ -19,20 +19,21 @@ impl LogsProcessor { tags_provider: Arc, event_bus: Sender, runtime: String, + aggregator_handle: AggregatorHandle, ) -> Self { match runtime.as_str() { LAMBDA_RUNTIME_SLUG => { - let lambda_processor = LambdaProcessor::new(tags_provider, config, event_bus); + let lambda_processor = LambdaProcessor::new(tags_provider, config, event_bus, aggregator_handle); LogsProcessor::Lambda(lambda_processor) } _ => panic!("Unsupported runtime: {runtime}"), } } - pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc>) { + pub async fn process(&mut self, event: TelemetryEvent) { match self { LogsProcessor::Lambda(lambda_processor) => { - lambda_processor.process(event, aggregator).await; + lambda_processor.process(event).await; } } } diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index f1bb1a4d6..4a88130a4 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -55,12 +55,13 @@ async fn test_logs() { )); let bus = EventBus::run(); + let (_, logs_aggr_handle) = bottlecap::logs::aggregator_service::AggregatorService::new_default(); let mut logs_agent = - LogsAgent::new(tags_provider, Arc::clone(&arc_conf), bus.get_sender_copy()); + LogsAgent::new(tags_provider, Arc::clone(&arc_conf), bus.get_sender_copy(), logs_aggr_handle.clone()); let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); let logs_flusher = LogsFlusher::new( api_key_factory, - Arc::clone(&logs_agent.aggregator), + logs_aggr_handle, arc_conf.clone(), ); From 087e656948037ca08212e28b03ad7ef6a6468fb3 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 11:09:18 -0400 Subject: [PATCH 02/11] fix tests --- bottlecap/src/logs/aggregator_service.rs | 6 +- bottlecap/src/logs/lambda/processor.rs | 704 ++++++++++++----------- 2 files changed, 386 insertions(+), 324 deletions(-) diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 8f7f2fcfc..5c91e59bc 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -108,7 +108,7 @@ mod tests { #[tokio::test] async fn test_aggregator_service_insert_and_flush() { - let (mut service, handle) = AggregatorService::new_default(); + let (service, handle) = AggregatorService::new_default(); // Spawn the service let service_handle = tokio::spawn(async move { @@ -148,7 +148,7 @@ mod tests { #[tokio::test] async fn test_aggregator_service_multiple_handles() { - let (mut service, handle1) = AggregatorService::new_default(); + let (service, handle1) = AggregatorService::new_default(); let handle2 = handle1.clone(); // Spawn the service @@ -192,7 +192,7 @@ mod tests { #[tokio::test] async fn test_aggregator_service_empty_flush() { - let (mut service, handle) = AggregatorService::new_default(); + let (service, handle) = AggregatorService::new_default(); // Spawn the service let service_handle = tokio::spawn(async move { diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 4d9c31f02..685becfd6 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -734,325 +734,387 @@ mod tests { ); } - // // process - // #[tokio::test] - // async fn test_process() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - - // let mut processor = - // { - // let (_, aggregator_handle) = AggregatorService::new_default(); - // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) - // }; - - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::PlatformStart { - // request_id: "test-request-id".to_string(), - // version: Some("test".to_string()), - // }, - // }; - - // processor.process(event.clone()).await; - - // let mut aggregator_lock = aggregator.lock().await; - // let batch = aggregator_lock.get_batch(); - // let log = IntakeLog { - // message: Message { - // message: "START RequestId: test-request-id Version: test".to_string(), - // lambda: Lambda { - // arn: "test-arn".to_string(), - // request_id: Some("test-request-id".to_string()), - // }, - // timestamp: 1_673_061_827_000, - // status: "info".to_string(), - // }, - // hostname: "test-arn".to_string(), - // source: LAMBDA_RUNTIME_SLUG.to_string(), - // service: "test-service".to_string(), - // tags: tags_provider.get_tags_string(), - // }; - // let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); - // assert_eq!(batch, serialized_log.as_bytes()); - // } - - // #[tokio::test] - // async fn test_process_logs_disabled() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // serverless_logs_enabled: false, - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - - // let mut processor = - // { - // let (_, aggregator_handle) = AggregatorService::new_default(); - // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) - // }; - - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::PlatformStart { - // request_id: "test-request-id".to_string(), - // version: Some("test".to_string()), - // }, - // }; - - // processor.process(event.clone()).await; - - // let mut aggregator_lock = aggregator.lock().await; - // let batch = aggregator_lock.get_batch(); - // assert_eq!(batch.len(), 0); - // } - - // #[tokio::test] - // async fn test_process_log_with_no_request_id() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - // let (_, aggregator_handle) = AggregatorService::new_default(); - - // let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); - - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::Function(Value::String("test-function".to_string())), - // }; - - // processor.process(event.clone()).await; - // assert_eq!(processor.orphan_logs.len(), 1); - - // let mut aggregator_lock = aggregator.lock().await; - // let batch = aggregator_lock.get_batch(); - // assert!(batch.is_empty()); - // } - - // #[tokio::test] - // async fn test_process_logs_after_seeing_request_id() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - - // let mut processor = - // { - // let (_, aggregator_handle) = AggregatorService::new_default(); - // LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone(), aggregator_handle) - // }; - - // let start_event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::PlatformStart { - // request_id: "test-request-id".to_string(), - // version: Some("test".to_string()), - // }, - // }; - - // processor.process(start_event.clone()).await; - // assert_eq!( - // processor.invocation_context.request_id, - // "test-request-id".to_string() - // ); - - // // This could be any event that doesn't have a `request_id` - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::Function(Value::String("test-function".to_string())), - // }; - - // processor.process(event.clone()).await; - - // // Verify aggregator logs - // let mut aggregator_lock = aggregator.lock().await; - // let batch = aggregator_lock.get_batch(); - // let start_log = IntakeLog { - // message: Message { - // message: "START RequestId: test-request-id Version: test".to_string(), - // lambda: Lambda { - // arn: "test-arn".to_string(), - // request_id: Some("test-request-id".to_string()), - // }, - // timestamp: 1_673_061_827_000, - // status: "info".to_string(), - // }, - // hostname: "test-arn".to_string(), - // source: LAMBDA_RUNTIME_SLUG.to_string(), - // service: "test-service".to_string(), - // tags: tags_provider.get_tags_string(), - // }; - // let function_log = IntakeLog { - // message: Message { - // message: "test-function".to_string(), - // lambda: Lambda { - // arn: "test-arn".to_string(), - // request_id: Some("test-request-id".to_string()), - // }, - // timestamp: 1_673_061_827_000, - // status: "info".to_string(), - // }, - // hostname: "test-arn".to_string(), - // source: LAMBDA_RUNTIME_SLUG.to_string(), - // service: "test-service".to_string(), - // tags: tags_provider.get_tags_string(), - // }; - // let serialized_log = format!( - // "[{},{}]", - // serde_json::to_string(&start_log).unwrap(), - // serde_json::to_string(&function_log).unwrap() - // ); - // assert_eq!(batch, serialized_log.as_bytes()); - // } - - // #[tokio::test] - // async fn test_process_logs_structured_ddtags() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - // let mut processor = - // LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); - // let start_event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::PlatformStart { - // request_id: "test-request-id".to_string(), - // version: Some("test".to_string()), - // }, - // }; - - // let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - // processor.get_intake_log(start_lambda_message).unwrap(); - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details": "my-structured-message","ddtags":"added_tag1:added_value1,added_tag2:added_value2"}}"#.to_string())), - // }; - - // let lambda_message = processor.get_message(event.clone()).await.unwrap(); - // let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - // assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - // assert_eq!(intake_log.hostname, "test-arn".to_string()); - // assert_eq!(intake_log.service, "test-service".to_string()); - // assert!(intake_log.tags.contains("added_tag1:added_value1")); - // let function_log = IntakeLog { - // message: Message { - // message: r#"{"custom_details":"my-structured-message"}"#.to_string(), - // lambda: Lambda { - // arn: "test-arn".to_string(), - // request_id: Some("test-request-id".to_string()), - // }, - // timestamp: 1_673_061_827_000, - // status: "info".to_string(), - // }, - // hostname: "test-arn".to_string(), - // source: LAMBDA_RUNTIME_SLUG.to_string(), - // service: "test-service".to_string(), - // tags: tags_provider.get_tags_string() - // + ",added_tag1:added_value1,added_tag2:added_value2", - // }; - // assert_eq!(intake_log, function_log); - // } - // #[tokio::test] - // async fn test_process_logs_structured_no_ddtags() { - // let config = Arc::new(config::Config { - // service: Some("test-service".to_string()), - // tags: HashMap::from([("test".to_string(), "tags".to_string())]), - // ..config::Config::default() - // }); - - // let tags_provider = Arc::new(provider::Provider::new( - // Arc::clone(&config), - // LAMBDA_RUNTIME_SLUG.to_string(), - // &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), - // )); - - // let (tx, _rx) = tokio::sync::mpsc::channel(2); - // let mut processor = - // LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); - // let start_event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::PlatformStart { - // request_id: "test-request-id".to_string(), - // version: Some("test".to_string()), - // }, - // }; - - // let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - // processor.get_intake_log(start_lambda_message).unwrap(); - // let event = TelemetryEvent { - // time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), - // record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string())), - // }; - - // let lambda_message = processor.get_message(event.clone()).await.unwrap(); - // let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - // assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - // assert_eq!(intake_log.hostname, "test-arn".to_string()); - // assert_eq!(intake_log.service, "test-service".to_string()); - // let function_log = IntakeLog { - // message: Message { - // message: r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string(), - // lambda: Lambda { - // arn: "test-arn".to_string(), - // request_id: Some("test-request-id".to_string()), - // }, - // timestamp: 1_673_061_827_000, - // status: "info".to_string(), - // }, - // hostname: "test-arn".to_string(), - // source: LAMBDA_RUNTIME_SLUG.to_string(), - // service: "test-service".to_string(), - // tags: tags_provider.get_tags_string(), - // }; - // assert_eq!(intake_log, function_log); - // } + // process + #[tokio::test] + async fn test_process() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + + // Spawn the aggregator service + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone() + ); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + processor.process(event.clone()).await; + + // Flush the aggregator to get the batches + let batches = aggregator_handle.flush().await.unwrap(); + assert_eq!(batches.len(), 1); + + let log = IntakeLog { + message: Message { + message: "START RequestId: test-request-id Version: test".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_061_827_000, + status: "info".to_string(), + }, + hostname: "test-arn".to_string(), + source: LAMBDA_RUNTIME_SLUG.to_string(), + service: "test-service".to_string(), + tags: tags_provider.get_tags_string(), + }; + let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); + assert_eq!(batches[0], serialized_log.as_bytes()); + + // Shutdown the service + aggregator_handle.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_process_logs_disabled() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + serverless_logs_enabled: false, + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + + // Spawn the aggregator service + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone() + ); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + processor.process(event.clone()).await; + + // Flush the aggregator to get the batches + let batches = aggregator_handle.flush().await.unwrap(); + assert!(batches.is_empty()); + + // Shutdown the service + aggregator_handle.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_process_log_with_no_request_id() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + + // Spawn the aggregator service + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + + let mut processor = LambdaProcessor::new( + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone() + ); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String("test-function".to_string())), + }; + + processor.process(event.clone()).await; + assert_eq!(processor.orphan_logs.len(), 1); + + // Flush the aggregator to get the batches + let batches = aggregator_handle.flush().await.unwrap(); + assert!(batches.is_empty()); + + // Shutdown the service + aggregator_handle.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_process_logs_after_seeing_request_id() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + + // Spawn the aggregator service + let service_handle = tokio::spawn(async move { + aggregator_service.run().await; + }); + + let mut processor = LambdaProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone() + ); + + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + processor.process(start_event.clone()).await; + assert_eq!( + processor.invocation_context.request_id, + "test-request-id".to_string() + ); + + // This could be any event that doesn't have a `request_id` + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String("test-function".to_string())), + }; + + processor.process(event.clone()).await; + + // Flush the aggregator to get the batches + let batches = aggregator_handle.flush().await.unwrap(); + assert_eq!(batches.len(), 1); + + let start_log = IntakeLog { + message: Message { + message: "START RequestId: test-request-id Version: test".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_061_827_000, + status: "info".to_string(), + }, + hostname: "test-arn".to_string(), + source: LAMBDA_RUNTIME_SLUG.to_string(), + service: "test-service".to_string(), + tags: tags_provider.get_tags_string(), + }; + let function_log = IntakeLog { + message: Message { + message: "test-function".to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_061_827_000, + status: "info".to_string(), + }, + hostname: "test-arn".to_string(), + source: LAMBDA_RUNTIME_SLUG.to_string(), + service: "test-service".to_string(), + tags: tags_provider.get_tags_string(), + }; + let serialized_log = format!( + "[{},{}]", + serde_json::to_string(&start_log).unwrap(), + serde_json::to_string(&function_log).unwrap() + ); + assert_eq!(batches[0], serialized_log.as_bytes()); + + // Shutdown the service + aggregator_handle.shutdown().unwrap(); + let _ = service_handle.await; + } + + #[tokio::test] + async fn test_process_logs_structured_ddtags() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (_, aggregator_handle) = AggregatorService::new_default(); + + let mut processor = LambdaProcessor::new( + tags_provider.clone(), + Arc::clone(&config), + tx.clone(), + aggregator_handle + ); + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); + processor.get_intake_log(start_lambda_message).unwrap(); + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details": "my-structured-message","ddtags":"added_tag1:added_value1,added_tag2:added_value2"}}"#.to_string())), + }; + + let lambda_message = processor.get_message(event.clone()).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + + assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); + assert_eq!(intake_log.hostname, "test-arn".to_string()); + assert_eq!(intake_log.service, "test-service".to_string()); + assert!(intake_log.tags.contains("added_tag1:added_value1")); + let function_log = IntakeLog { + message: Message { + message: r#"{"custom_details":"my-structured-message"}"#.to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_061_827_000, + status: "info".to_string(), + }, + hostname: "test-arn".to_string(), + source: LAMBDA_RUNTIME_SLUG.to_string(), + service: "test-service".to_string(), + tags: tags_provider.get_tags_string() + + ",added_tag1:added_value1,added_tag2:added_value2", + }; + assert_eq!(intake_log, function_log); + } + #[tokio::test] + async fn test_process_logs_structured_no_ddtags() { + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: HashMap::from([("test".to_string(), "tags".to_string())]), + ..config::Config::default() + }); + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + + let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (_, aggregator_handle) = AggregatorService::new_default(); + + let mut processor = LambdaProcessor::new( + tags_provider.clone(), + Arc::clone(&config), + tx.clone(), + aggregator_handle + ); + let start_event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::PlatformStart { + request_id: "test-request-id".to_string(), + version: Some("test".to_string()), + }, + }; + + let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); + processor.get_intake_log(start_lambda_message).unwrap(); + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string())), + }; + + let lambda_message = processor.get_message(event.clone()).await.unwrap(); + let intake_log = processor.get_intake_log(lambda_message).unwrap(); + + assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); + assert_eq!(intake_log.hostname, "test-arn".to_string()); + assert_eq!(intake_log.service, "test-service".to_string()); + let function_log = IntakeLog { + message: Message { + message: r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string(), + lambda: Lambda { + arn: "test-arn".to_string(), + request_id: Some("test-request-id".to_string()), + }, + timestamp: 1_673_061_827_000, + status: "info".to_string(), + }, + hostname: "test-arn".to_string(), + source: LAMBDA_RUNTIME_SLUG.to_string(), + service: "test-service".to_string(), + tags: tags_provider.get_tags_string(), + }; + assert_eq!(intake_log, function_log); + } } From 3a191244487286758db874ae1114d7085be66a35 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 14:11:33 -0400 Subject: [PATCH 03/11] fmt --- bottlecap/src/bin/bottlecap/main.rs | 26 +++++--- bottlecap/src/logs/agent.rs | 6 +- bottlecap/src/logs/aggregator_service.rs | 72 ++------------------ bottlecap/src/logs/flusher.rs | 35 ++++++---- bottlecap/src/logs/lambda/processor.rs | 85 +++++++++++++++--------- bottlecap/src/logs/processor.rs | 3 +- bottlecap/tests/logs_integration_test.rs | 17 ++--- 7 files changed, 109 insertions(+), 135 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 77f4f2ad9..772c70c83 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -43,7 +43,13 @@ use bottlecap::{ listener::Listener as LifecycleListener, }, logger, - logs::{agent::LogsAgent, aggregator_service::{AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService}, flusher::LogsFlusher}, + logs::{ + agent::LogsAgent, + aggregator_service::{ + AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, + }, + flusher::LogsFlusher, + }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, proxy::{interceptor, should_start_proxy}, secrets::decrypt, @@ -380,10 +386,7 @@ async fn extension_loop_active( let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::new_default(); debug!( "Logs aggregator created in {:} microseconds", - logs_aggr_init_start_time - .elapsed() - .as_micros() - .to_string() + logs_aggr_init_start_time.elapsed().as_micros().to_string() ); start_logs_aggregator(logs_aggr_service); @@ -917,13 +920,14 @@ fn start_logs_agent( event_bus: Sender, logs_aggr_handle: LogsAggregatorHandle, ) -> (Sender, LogsFlusher) { - let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus, logs_aggr_handle.clone()); - let logs_agent_channel = logs_agent.get_sender_copy(); - let logs_flusher = LogsFlusher::new( - api_key_factory, - logs_aggr_handle, - config.clone(), + let mut logs_agent = LogsAgent::new( + Arc::clone(tags_provider), + Arc::clone(config), + event_bus, + logs_aggr_handle.clone(), ); + let logs_agent_channel = logs_agent.get_sender_copy(); + let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, config.clone()); tokio::spawn(async move { logs_agent.spin().await; }); diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index fa4965c5c..4d2ddf1f6 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -32,11 +32,7 @@ impl LogsAgent { let (tx, rx) = mpsc::channel::(1000); - Self { - tx, - rx, - processor, - } + Self { tx, rx, processor } } pub async fn spin(&mut self) { diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 5c91e59bc..0b6cbe2e3 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -109,12 +109,12 @@ mod tests { #[tokio::test] async fn test_aggregator_service_insert_and_flush() { let (service, handle) = AggregatorService::new_default(); - + // Spawn the service let service_handle = tokio::spawn(async move { service.run().await; }); - + let log = IntakeLog { message: Message { message: "test".to_string(), @@ -131,80 +131,18 @@ mod tests { source: "source".to_string(), }; let serialized_log = serde_json::to_string(&log).unwrap(); - + // Insert logs using handle handle.insert_batch(vec![serialized_log.clone()]).unwrap(); - + // Flush all batches let batches = handle.flush().await.unwrap(); assert_eq!(batches.len(), 1); let serialized_batch = format!("[{}]", serialized_log); assert_eq!(batches[0], serialized_batch.as_bytes()); - - // Shutdown the service - handle.shutdown().unwrap(); - let _ = service_handle.await; - } - - #[tokio::test] - async fn test_aggregator_service_multiple_handles() { - let (service, handle1) = AggregatorService::new_default(); - let handle2 = handle1.clone(); - - // Spawn the service - let service_handle = tokio::spawn(async move { - service.run().await; - }); - - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - let serialized_log = serde_json::to_string(&log).unwrap(); - - // Send logs using both handles - handle1.insert_batch(vec![serialized_log.clone()]).unwrap(); - handle2.insert_batch(vec![serialized_log.clone()]).unwrap(); - - // Flush all batches - let batches = handle1.flush().await.unwrap(); - assert_eq!(batches.len(), 2); - - let serialized_batch = format!("[{}]", serialized_log); - assert_eq!(batches[0], serialized_batch.as_bytes()); - assert_eq!(batches[1], serialized_batch.as_bytes()); - - // Shutdown the service - handle1.shutdown().unwrap(); - let _ = service_handle.await; - } - #[tokio::test] - async fn test_aggregator_service_empty_flush() { - let (service, handle) = AggregatorService::new_default(); - - // Spawn the service - let service_handle = tokio::spawn(async move { - service.run().await; - }); - - // Flush when no logs are inserted - let batches = handle.flush().await.unwrap(); - assert!(batches.is_empty()); - // Shutdown the service handle.shutdown().unwrap(); let _ = service_handle.await; - } + } } diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index bed33b582..62cf98469 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -10,10 +10,7 @@ use std::error::Error; use std::time::Instant; use std::{io::Write, sync::Arc}; use thiserror::Error as ThisError; -use tokio::{ - sync::OnceCell, - task::JoinSet, -}; +use tokio::{sync::OnceCell, task::JoinSet}; use tracing::{debug, error}; use zstd::stream::write::Encoder; @@ -225,12 +222,19 @@ impl LogsFlusher { )); // Create flushers for additional endpoints - debug!("=== Creating {} additional log flushers for dual shipping ===", config.logs_config_additional_endpoints.len()); + debug!( + "=== Creating {} additional log flushers for dual shipping ===", + config.logs_config_additional_endpoints.len() + ); for endpoint in &config.logs_config_additional_endpoints { let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); - debug!("=== Creating additional log flusher for endpoint: {}", endpoint_url); + debug!( + "=== Creating additional log flusher for endpoint: {}", + endpoint_url + ); // Create a separate API key factory for this endpoint using its specific API key - let additional_api_key_factory = Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); + let additional_api_key_factory = + Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); flushers.push(Flusher::new( additional_api_key_factory, endpoint_url, @@ -238,8 +242,8 @@ impl LogsFlusher { )); } - LogsFlusher { - config, + LogsFlusher { + config, flushers, aggregator_handle, } @@ -268,7 +272,10 @@ impl LogsFlusher { } else { let logs_batches = Arc::new({ match self.aggregator_handle.flush().await { - Ok(batches) => batches.into_iter().map(|batch| self.compress(batch)).collect(), + Ok(batches) => batches + .into_iter() + .map(|batch| self.compress(batch)) + .collect(), Err(e) => { debug!("Failed to flush from aggregator: {}", e); Vec::new() @@ -281,8 +288,12 @@ impl LogsFlusher { let batches = Arc::clone(&logs_batches); let flusher = flusher.clone(); async move { - debug!("=== Flusher for endpoint {} processing {} batches ===", flusher.endpoint, batches.len()); - flusher.flush(Some(batches)).await + debug!( + "=== Flusher for endpoint {} processing {} batches ===", + flusher.endpoint, + batches.len() + ); + flusher.flush(Some(batches)).await } }); diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 685becfd6..1d014dc4b 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -356,7 +356,10 @@ impl LambdaProcessor { if !self.ready_logs.is_empty() { // Send logs to aggregator via handle - if let Err(e) = self.aggregator_handle.insert_batch(std::mem::take(&mut self.ready_logs)) { + if let Err(e) = self + .aggregator_handle + .insert_batch(std::mem::take(&mut self.ready_logs)) + { debug!("Failed to send logs to aggregator: {}", e); } } @@ -592,7 +595,12 @@ mod tests { let (tx, _) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); + let mut processor = LambdaProcessor::new( + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle, + ); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -626,7 +634,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); + let mut processor = LambdaProcessor::new( + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle, + ); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -673,7 +686,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); + let mut processor = LambdaProcessor::new( + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle, + ); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -707,7 +725,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone(), aggregator_handle); + let mut processor = LambdaProcessor::new( + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle, + ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -758,10 +781,10 @@ mod tests { }); let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone() + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone(), ); let event = TelemetryEvent { @@ -777,7 +800,7 @@ mod tests { // Flush the aggregator to get the batches let batches = aggregator_handle.flush().await.unwrap(); assert_eq!(batches.len(), 1); - + let log = IntakeLog { message: Message { message: "START RequestId: test-request-id Version: test".to_string(), @@ -825,10 +848,10 @@ mod tests { }); let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone() + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone(), ); let event = TelemetryEvent { @@ -873,10 +896,10 @@ mod tests { }); let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone() + tags_provider, + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone(), ); let event = TelemetryEvent { @@ -919,10 +942,10 @@ mod tests { }); let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone() + Arc::clone(&tags_provider), + Arc::clone(&config), + tx.clone(), + aggregator_handle.clone(), ); let start_event = TelemetryEvent { @@ -950,7 +973,7 @@ mod tests { // Flush the aggregator to get the batches let batches = aggregator_handle.flush().await.unwrap(); assert_eq!(batches.len(), 1); - + let start_log = IntakeLog { message: Message { message: "START RequestId: test-request-id Version: test".to_string(), @@ -1009,12 +1032,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - + let mut processor = LambdaProcessor::new( - tags_provider.clone(), - Arc::clone(&config), + tags_provider.clone(), + Arc::clone(&config), tx.clone(), - aggregator_handle + aggregator_handle, ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1072,12 +1095,12 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (_, aggregator_handle) = AggregatorService::new_default(); - + let mut processor = LambdaProcessor::new( - tags_provider.clone(), - Arc::clone(&config), + tags_provider.clone(), + Arc::clone(&config), tx.clone(), - aggregator_handle + aggregator_handle, ); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index 3c5e42b5b..74ba299b4 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -23,7 +23,8 @@ impl LogsProcessor { ) -> Self { match runtime.as_str() { LAMBDA_RUNTIME_SLUG => { - let lambda_processor = LambdaProcessor::new(tags_provider, config, event_bus, aggregator_handle); + let lambda_processor = + LambdaProcessor::new(tags_provider, config, event_bus, aggregator_handle); LogsProcessor::Lambda(lambda_processor) } _ => panic!("Unsupported runtime: {runtime}"), diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 4a88130a4..54b6a39c4 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -55,15 +55,16 @@ async fn test_logs() { )); let bus = EventBus::run(); - let (_, logs_aggr_handle) = bottlecap::logs::aggregator_service::AggregatorService::new_default(); - let mut logs_agent = - LogsAgent::new(tags_provider, Arc::clone(&arc_conf), bus.get_sender_copy(), logs_aggr_handle.clone()); - let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); - let logs_flusher = LogsFlusher::new( - api_key_factory, - logs_aggr_handle, - arc_conf.clone(), + let (_, logs_aggr_handle) = + bottlecap::logs::aggregator_service::AggregatorService::new_default(); + let mut logs_agent = LogsAgent::new( + tags_provider, + Arc::clone(&arc_conf), + bus.get_sender_copy(), + logs_aggr_handle.clone(), ); + let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); + let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone()); let telemetry_events: Vec = serde_json::from_str( r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) From ddddf15ec54bc324a52f7b1e3899ba21dc85c53d Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 14:23:22 -0400 Subject: [PATCH 04/11] clippy --- bottlecap/src/logs/aggregator_service.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 0b6cbe2e3..81f17ed2f 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -27,11 +27,11 @@ impl AggregatorHandle { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(AggregatorCommand::Flush(response_tx)) - .map_err(|e| format!("Failed to send flush command: {}", e))?; + .map_err(|e| format!("Failed to send flush command: {e}"))?; response_rx .await - .map_err(|e| format!("Failed to receive flush response: {}", e)) + .map_err(|e| format!("Failed to receive flush response: {e}")) } pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { @@ -45,6 +45,7 @@ pub struct AggregatorService { } impl AggregatorService { + #[must_use] pub fn new( max_batch_entries_size: usize, max_content_size_bytes: usize, @@ -63,6 +64,7 @@ impl AggregatorService { (service, handle) } + #[must_use] pub fn new_default() -> (Self, AggregatorHandle) { use crate::logs::constants; Self::new( @@ -144,5 +146,5 @@ mod tests { // Shutdown the service handle.shutdown().unwrap(); let _ = service_handle.await; - } + } } From 3383e6be7e485247b9b1c5eab1e37d2d44bd9e44 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 16:51:23 -0400 Subject: [PATCH 05/11] fix test --- bottlecap/src/logs/aggregator_service.rs | 2 +- bottlecap/src/logs/flusher.rs | 1 + bottlecap/tests/logs_integration_test.rs | 8 +++++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 81f17ed2f..95b03ebd8 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -140,7 +140,7 @@ mod tests { // Flush all batches let batches = handle.flush().await.unwrap(); assert_eq!(batches.len(), 1); - let serialized_batch = format!("[{}]", serialized_log); + let serialized_batch = format!("[{serialized_log}]"); assert_eq!(batches[0], serialized_batch.as_bytes()); // Shutdown the service diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index 62cf98469..868a3c196 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -31,6 +31,7 @@ pub struct Flusher { } impl Flusher { + #[must_use] pub fn new( api_key_factory: Arc, endpoint: String, diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 54b6a39c4..1e036ca1d 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -55,8 +55,14 @@ async fn test_logs() { )); let bus = EventBus::run(); - let (_, logs_aggr_handle) = + let (logs_aggr_service, logs_aggr_handle) = bottlecap::logs::aggregator_service::AggregatorService::new_default(); + + // Spawn the aggregator service + tokio::spawn(async move { + logs_aggr_service.run().await; + }); + let mut logs_agent = LogsAgent::new( tags_provider, Arc::clone(&arc_conf), From 0272a3f91abc28f72c3df92a579e446834462c74 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 30 Sep 2025 17:11:33 -0400 Subject: [PATCH 06/11] fmt --- bottlecap/tests/logs_integration_test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 1e036ca1d..c1e276064 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -57,12 +57,12 @@ async fn test_logs() { let bus = EventBus::run(); let (logs_aggr_service, logs_aggr_handle) = bottlecap::logs::aggregator_service::AggregatorService::new_default(); - + // Spawn the aggregator service tokio::spawn(async move { logs_aggr_service.run().await; }); - + let mut logs_agent = LogsAgent::new( tags_provider, Arc::clone(&arc_conf), From 3c3c5daabaf828844d21e72405b0f4248e95ff11 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 1 Oct 2025 10:57:12 -0400 Subject: [PATCH 07/11] remove extra debug logs --- bottlecap/src/bin/bottlecap/main.rs | 7 +---- bottlecap/src/logs/aggregator_service.rs | 39 ++++++++++++------------ bottlecap/src/logs/flusher.rs | 21 ++----------- bottlecap/src/logs/lambda/processor.rs | 22 ++++++------- bottlecap/tests/logs_integration_test.rs | 2 +- 5 files changed, 35 insertions(+), 56 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 772c70c83..c7853935c 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -382,12 +382,7 @@ async fn extension_loop_active( .to_string(); let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id); - let logs_aggr_init_start_time = Instant::now(); - let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::new_default(); - debug!( - "Logs aggregator created in {:} microseconds", - logs_aggr_init_start_time.elapsed().as_micros().to_string() - ); + let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::default(); start_logs_aggregator(logs_aggr_service); let (logs_agent_channel, logs_flusher) = start_logs_agent( diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 95b03ebd8..9b72ba72b 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -1,7 +1,8 @@ +use log::error; use tokio::sync::{mpsc, oneshot}; use tracing::debug; -use crate::logs::aggregator::Aggregator; +use crate::logs::{aggregator::Aggregator, constants}; #[derive(Debug)] pub enum AggregatorCommand { @@ -45,6 +46,16 @@ pub struct AggregatorService { } impl AggregatorService { + #[must_use] + #[allow(clippy::should_implement_trait)] + pub fn default() -> (Self, AggregatorHandle) { + Self::new( + constants::MAX_BATCH_ENTRIES_SIZE, + constants::MAX_CONTENT_SIZE_BYTES, + constants::MAX_LOG_SIZE_BYTES, + ) + } + #[must_use] pub fn new( max_batch_entries_size: usize, @@ -64,16 +75,6 @@ impl AggregatorService { (service, handle) } - #[must_use] - pub fn new_default() -> (Self, AggregatorHandle) { - use crate::logs::constants; - Self::new( - constants::MAX_BATCH_ENTRIES_SIZE, - constants::MAX_CONTENT_SIZE_BYTES, - constants::MAX_LOG_SIZE_BYTES, - ) - } - pub async fn run(mut self) { debug!("Logs aggregator service started"); @@ -84,14 +85,14 @@ impl AggregatorService { } AggregatorCommand::Flush(response_tx) => { let mut batches = Vec::new(); - loop { - let batch = self.aggregator.get_batch(); - if batch.is_empty() { - break; - } - batches.push(batch); + let mut current_batch = self.aggregator.get_batch(); + while !current_batch.is_empty() { + batches.push(current_batch); + current_batch = self.aggregator.get_batch(); + } + if response_tx.send(batches).is_err() { + error!("Failed to send logs flush response - receiver dropped"); } - let _ = response_tx.send(batches); } AggregatorCommand::Shutdown => { debug!("Logs aggregator service shutting down"); @@ -110,7 +111,7 @@ mod tests { #[tokio::test] async fn test_aggregator_service_insert_and_flush() { - let (service, handle) = AggregatorService::new_default(); + let (service, handle) = AggregatorService::default(); // Spawn the service let service_handle = tokio::spawn(async move { diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index 868a3c196..db6794aba 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -48,7 +48,6 @@ impl Flusher { } pub async fn flush(&self, batches: Option>>>) -> Vec { - debug!("=== Flushing batches to endpoint: {} ===", self.endpoint); let Some(api_key) = self.api_key_factory.get_api_key().await else { error!("Skipping flushing logs: Failed to resolve API key"); return vec![]; @@ -223,17 +222,8 @@ impl LogsFlusher { )); // Create flushers for additional endpoints - debug!( - "=== Creating {} additional log flushers for dual shipping ===", - config.logs_config_additional_endpoints.len() - ); for endpoint in &config.logs_config_additional_endpoints { let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); - debug!( - "=== Creating additional log flusher for endpoint: {}", - endpoint_url - ); - // Create a separate API key factory for this endpoint using its specific API key let additional_api_key_factory = Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); flushers.push(Flusher::new( @@ -284,18 +274,11 @@ impl LogsFlusher { } }); - // Send batches to each flusher (dual shipping) + // Send batches to each flusher let futures = self.flushers.iter().map(|flusher| { let batches = Arc::clone(&logs_batches); let flusher = flusher.clone(); - async move { - debug!( - "=== Flusher for endpoint {} processing {} batches ===", - flusher.endpoint, - batches.len() - ); - flusher.flush(Some(batches)).await - } + async move { flusher.flush(Some(batches)).await } }); let results = join_all(futures).await; diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 1d014dc4b..cc1da63fc 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -402,7 +402,7 @@ mod tests { &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]))); let (tx, _) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, @@ -593,7 +593,7 @@ mod tests { )); let (tx, _) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, @@ -633,7 +633,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, Arc::clone(&config), @@ -685,7 +685,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, Arc::clone(&config), @@ -724,7 +724,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, Arc::clone(&config), @@ -773,7 +773,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); // Spawn the aggregator service let service_handle = tokio::spawn(async move { @@ -840,7 +840,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); // Spawn the aggregator service let service_handle = tokio::spawn(async move { @@ -888,7 +888,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); // Spawn the aggregator service let service_handle = tokio::spawn(async move { @@ -934,7 +934,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::new_default(); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); // Spawn the aggregator service let service_handle = tokio::spawn(async move { @@ -1031,7 +1031,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider.clone(), @@ -1094,7 +1094,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::new_default(); + let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider.clone(), diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index c1e276064..cfae30a72 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -56,7 +56,7 @@ async fn test_logs() { let bus = EventBus::run(); let (logs_aggr_service, logs_aggr_handle) = - bottlecap::logs::aggregator_service::AggregatorService::new_default(); + bottlecap::logs::aggregator_service::AggregatorService::default(); // Spawn the aggregator service tokio::spawn(async move { From f2d5b0f126ccd2777bfa2c4a005c7020b921d80c Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 1 Oct 2025 14:42:56 -0400 Subject: [PATCH 08/11] fixes --- bottlecap/src/bin/bottlecap/main.rs | 12 +-- bottlecap/src/logs/agent.rs | 15 ++- bottlecap/src/logs/aggregator_service.rs | 13 ++- bottlecap/src/logs/flusher.rs | 2 +- bottlecap/src/logs/lambda/processor.rs | 130 ++++++----------------- bottlecap/src/logs/processor.rs | 11 +- 6 files changed, 57 insertions(+), 126 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index c7853935c..766fd3b4d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -45,9 +45,7 @@ use bottlecap::{ logger, logs::{ agent::LogsAgent, - aggregator_service::{ - AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, - }, + aggregator_service::AggregatorService as LogsAggregatorService, flusher::LogsFlusher, }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, @@ -382,15 +380,11 @@ async fn extension_loop_active( .to_string(); let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id); - let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::default(); - start_logs_aggregator(logs_aggr_service); - let (logs_agent_channel, logs_flusher) = start_logs_agent( config, Arc::clone(&api_key_factory), &tags_provider, event_bus.get_sender_copy(), - logs_aggr_handle, ); let metrics_aggr_init_start_time = Instant::now(); @@ -913,8 +907,10 @@ fn start_logs_agent( api_key_factory: Arc, tags_provider: &Arc, event_bus: Sender, - logs_aggr_handle: LogsAggregatorHandle, ) -> (Sender, LogsFlusher) { + let (logs_aggr_service, logs_aggr_handle) = LogsAggregatorService::default(); + start_logs_aggregator(logs_aggr_service); + let mut logs_agent = LogsAgent::new( Arc::clone(tags_provider), Arc::clone(config), diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 4d2ddf1f6..acd613644 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -12,6 +12,7 @@ pub struct LogsAgent { tx: Sender, rx: mpsc::Receiver, processor: LogsProcessor, + aggregator_handle: AggregatorHandle, } impl LogsAgent { @@ -27,23 +28,29 @@ impl LogsAgent { tags_provider, event_bus, LAMBDA_RUNTIME_SLUG.to_string(), - aggregator_handle.clone(), ); let (tx, rx) = mpsc::channel::(1000); - Self { tx, rx, processor } + Self { + tx, + rx, + processor, + aggregator_handle, + } } pub async fn spin(&mut self) { while let Some(event) = self.rx.recv().await { - self.processor.process(event).await; + self.processor.process(event, &self.aggregator_handle).await; } } pub async fn sync_consume(&mut self) { if let Some(events) = self.rx.recv().await { - self.processor.process(events).await; + self.processor + .process(events, &self.aggregator_handle) + .await; } } diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 9b72ba72b..154e48e44 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -1,13 +1,12 @@ -use log::error; use tokio::sync::{mpsc, oneshot}; -use tracing::debug; +use tracing::{debug, error}; use crate::logs::{aggregator::Aggregator, constants}; #[derive(Debug)] pub enum AggregatorCommand { InsertBatch(Vec), - Flush(oneshot::Sender>>), + GetBatches(oneshot::Sender>>), Shutdown, } @@ -24,10 +23,10 @@ impl AggregatorHandle { self.tx.send(AggregatorCommand::InsertBatch(logs)) } - pub async fn flush(&self) -> Result>, String> { + pub async fn get_batches(&self) -> Result>, String> { let (response_tx, response_rx) = oneshot::channel(); self.tx - .send(AggregatorCommand::Flush(response_tx)) + .send(AggregatorCommand::GetBatches(response_tx)) .map_err(|e| format!("Failed to send flush command: {e}"))?; response_rx @@ -83,7 +82,7 @@ impl AggregatorService { AggregatorCommand::InsertBatch(logs) => { self.aggregator.add_batch(logs); } - AggregatorCommand::Flush(response_tx) => { + AggregatorCommand::GetBatches(response_tx) => { let mut batches = Vec::new(); let mut current_batch = self.aggregator.get_batch(); while !current_batch.is_empty() { @@ -139,7 +138,7 @@ mod tests { handle.insert_batch(vec![serialized_log.clone()]).unwrap(); // Flush all batches - let batches = handle.flush().await.unwrap(); + let batches = handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); let serialized_batch = format!("[{serialized_log}]"); assert_eq!(batches[0], serialized_batch.as_bytes()); diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index db6794aba..4a81722c7 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -262,7 +262,7 @@ impl LogsFlusher { } } else { let logs_batches = Arc::new({ - match self.aggregator_handle.flush().await { + match self.aggregator_handle.get_batches().await { Ok(batches) => batches .into_iter() .map(|batch| self.compress(batch)) diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index cc1da63fc..e5e01defe 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -31,8 +31,6 @@ pub struct LambdaProcessor { ready_logs: Vec, // Main event bus event_bus: Sender, - // Aggregator handle for sending logs - aggregator_handle: AggregatorHandle, // Logs enabled logs_enabled: bool, } @@ -61,7 +59,6 @@ impl LambdaProcessor { tags_provider: Arc, datadog_config: Arc, event_bus: Sender, - aggregator_handle: AggregatorHandle, ) -> Self { let service = datadog_config.service.clone().unwrap_or_default(); let tags = tags_provider.get_tags_string(); @@ -80,7 +77,6 @@ impl LambdaProcessor { orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, - aggregator_handle, } } @@ -328,7 +324,7 @@ impl LambdaProcessor { } } - pub async fn process(&mut self, event: TelemetryEvent) { + pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) { if let Ok(mut log) = self.make_log(event).await { let should_send_log = self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); @@ -356,10 +352,7 @@ impl LambdaProcessor { if !self.ready_logs.is_empty() { // Send logs to aggregator via handle - if let Err(e) = self - .aggregator_handle - .insert_batch(std::mem::take(&mut self.ready_logs)) - { + if let Err(e) = aggregator_handle.insert_batch(std::mem::take(&mut self.ready_logs)) { debug!("Failed to send logs to aggregator: {}", e); } } @@ -402,7 +395,6 @@ mod tests { &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]))); let (tx, _) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); let mut processor = LambdaProcessor::new( tags_provider, @@ -411,7 +403,6 @@ mod tests { tags, ..config::Config::default()}), tx.clone(), - aggregator_handle, ); let result = processor.get_message(input.clone()).await.unwrap(); @@ -593,14 +584,8 @@ mod tests { )); let (tx, _) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -633,13 +618,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -685,13 +664,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -724,13 +697,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -780,12 +747,8 @@ mod tests { aggregator_service.run().await; }); - let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone(), - ); + let mut processor = + LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -795,10 +758,9 @@ mod tests { }, }; - processor.process(event.clone()).await; + processor.process(event.clone(), &aggregator_handle).await; - // Flush the aggregator to get the batches - let batches = aggregator_handle.flush().await.unwrap(); + let batches = aggregator_handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); let log = IntakeLog { @@ -819,7 +781,6 @@ mod tests { let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); assert_eq!(batches[0], serialized_log.as_bytes()); - // Shutdown the service aggregator_handle.shutdown().unwrap(); let _ = service_handle.await; } @@ -840,19 +801,14 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); - - // Spawn the aggregator service let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); - let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone(), - ); + let mut processor = + LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -862,13 +818,11 @@ mod tests { }, }; - processor.process(event.clone()).await; + processor.process(event.clone(), &aggregator_handle).await; - // Flush the aggregator to get the batches - let batches = aggregator_handle.flush().await.unwrap(); + let batches = aggregator_handle.get_batches().await.unwrap(); assert!(batches.is_empty()); - // Shutdown the service aggregator_handle.shutdown().unwrap(); let _ = service_handle.await; } @@ -888,33 +842,25 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); - - // Spawn the aggregator service let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); - let mut processor = LambdaProcessor::new( - tags_provider, - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone(), - ); + let mut processor = LambdaProcessor::new(tags_provider, Arc::clone(&config), tx.clone()); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::Function(Value::String("test-function".to_string())), }; - processor.process(event.clone()).await; + processor.process(event.clone(), &aggregator_handle).await; assert_eq!(processor.orphan_logs.len(), 1); - // Flush the aggregator to get the batches - let batches = aggregator_handle.flush().await.unwrap(); + let batches = aggregator_handle.get_batches().await.unwrap(); assert!(batches.is_empty()); - // Shutdown the service aggregator_handle.shutdown().unwrap(); let _ = service_handle.await; } @@ -934,19 +880,14 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); + let (aggregator_service, aggregator_handle) = AggregatorService::default(); - - // Spawn the aggregator service let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); - let mut processor = LambdaProcessor::new( - Arc::clone(&tags_provider), - Arc::clone(&config), - tx.clone(), - aggregator_handle.clone(), - ); + let mut processor = + LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -956,7 +897,9 @@ mod tests { }, }; - processor.process(start_event.clone()).await; + processor + .process(start_event.clone(), &aggregator_handle) + .await; assert_eq!( processor.invocation_context.request_id, "test-request-id".to_string() @@ -968,10 +911,9 @@ mod tests { record: TelemetryRecord::Function(Value::String("test-function".to_string())), }; - processor.process(event.clone()).await; + processor.process(event.clone(), &aggregator_handle).await; - // Flush the aggregator to get the batches - let batches = aggregator_handle.flush().await.unwrap(); + let batches = aggregator_handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); let start_log = IntakeLog { @@ -1031,14 +973,9 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider.clone(), - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = + LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::PlatformStart { @@ -1094,14 +1031,9 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (_, aggregator_handle) = AggregatorService::default(); - let mut processor = LambdaProcessor::new( - tags_provider.clone(), - Arc::clone(&config), - tx.clone(), - aggregator_handle, - ); + let mut processor = + LambdaProcessor::new(tags_provider.clone(), Arc::clone(&config), tx.clone()); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::PlatformStart { diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index 74ba299b4..cc3d220af 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -7,10 +7,9 @@ use crate::LAMBDA_RUNTIME_SLUG; use crate::config::{self, processing_rule}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::tags; - use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::lambda::processor::LambdaProcessor; +use crate::tags; impl LogsProcessor { #[must_use] @@ -19,22 +18,20 @@ impl LogsProcessor { tags_provider: Arc, event_bus: Sender, runtime: String, - aggregator_handle: AggregatorHandle, ) -> Self { match runtime.as_str() { LAMBDA_RUNTIME_SLUG => { - let lambda_processor = - LambdaProcessor::new(tags_provider, config, event_bus, aggregator_handle); + let lambda_processor = LambdaProcessor::new(tags_provider, config, event_bus); LogsProcessor::Lambda(lambda_processor) } _ => panic!("Unsupported runtime: {runtime}"), } } - pub async fn process(&mut self, event: TelemetryEvent) { + pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) { match self { LogsProcessor::Lambda(lambda_processor) => { - lambda_processor.process(event).await; + lambda_processor.process(event, aggregator_handle).await; } } } From 5470dc001b36b98f96e56299f7d9f376c5c3a581 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 1 Oct 2025 14:44:33 -0400 Subject: [PATCH 09/11] fmt --- bottlecap/src/bin/bottlecap/main.rs | 3 +-- bottlecap/src/logs/lambda/processor.rs | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 766fd3b4d..8f1c783c4 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -44,8 +44,7 @@ use bottlecap::{ }, logger, logs::{ - agent::LogsAgent, - aggregator_service::AggregatorService as LogsAggregatorService, + agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService, flusher::LogsFlusher, }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index e5e01defe..5d53c8749 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -801,7 +801,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - + let (aggregator_service, aggregator_handle) = AggregatorService::default(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; @@ -842,7 +842,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - + let (aggregator_service, aggregator_handle) = AggregatorService::default(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; @@ -880,7 +880,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - + let (aggregator_service, aggregator_handle) = AggregatorService::default(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; From 7f38479e12c97320a844a7ef22e4a908d8c3edf8 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 1 Oct 2025 15:03:01 -0400 Subject: [PATCH 10/11] clean up tests --- bottlecap/src/logs/aggregator_service.rs | 12 ++++----- bottlecap/src/logs/lambda/processor.rs | 34 +++++++++++++++++------- bottlecap/tests/logs_integration_test.rs | 3 +-- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index 154e48e44..d281bfb19 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -112,7 +112,6 @@ mod tests { async fn test_aggregator_service_insert_and_flush() { let (service, handle) = AggregatorService::default(); - // Spawn the service let service_handle = tokio::spawn(async move { service.run().await; }); @@ -134,17 +133,18 @@ mod tests { }; let serialized_log = serde_json::to_string(&log).unwrap(); - // Insert logs using handle handle.insert_batch(vec![serialized_log.clone()]).unwrap(); - // Flush all batches let batches = handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); let serialized_batch = format!("[{serialized_log}]"); assert_eq!(batches[0], serialized_batch.as_bytes()); - // Shutdown the service - handle.shutdown().unwrap(); - let _ = service_handle.await; + handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } } diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 5d53c8749..d3fc12588 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -351,7 +351,6 @@ impl LambdaProcessor { } if !self.ready_logs.is_empty() { - // Send logs to aggregator via handle if let Err(e) = aggregator_handle.insert_batch(std::mem::take(&mut self.ready_logs)) { debug!("Failed to send logs to aggregator: {}", e); } @@ -781,8 +780,12 @@ mod tests { let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); assert_eq!(batches[0], serialized_log.as_bytes()); - aggregator_handle.shutdown().unwrap(); - let _ = service_handle.await; + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] @@ -823,8 +826,12 @@ mod tests { let batches = aggregator_handle.get_batches().await.unwrap(); assert!(batches.is_empty()); - aggregator_handle.shutdown().unwrap(); - let _ = service_handle.await; + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] @@ -861,8 +868,12 @@ mod tests { let batches = aggregator_handle.get_batches().await.unwrap(); assert!(batches.is_empty()); - aggregator_handle.shutdown().unwrap(); - let _ = service_handle.await; + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] @@ -953,9 +964,12 @@ mod tests { ); assert_eq!(batches[0], serialized_log.as_bytes()); - // Shutdown the service - aggregator_handle.shutdown().unwrap(); - let _ = service_handle.await; + aggregator_handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); } #[tokio::test] diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index cfae30a72..3100322b2 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -55,10 +55,9 @@ async fn test_logs() { )); let bus = EventBus::run(); + let (logs_aggr_service, logs_aggr_handle) = bottlecap::logs::aggregator_service::AggregatorService::default(); - - // Spawn the aggregator service tokio::spawn(async move { logs_aggr_service.run().await; }); From 88688788c2e3b43d871d40cb1de2e7fc66e6bc9c Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 1 Oct 2025 15:09:21 -0400 Subject: [PATCH 11/11] comment --- bottlecap/src/logs/lambda/processor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index d3fc12588..279e06ef0 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -741,7 +741,6 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); let (aggregator_service, aggregator_handle) = AggregatorService::default(); - // Spawn the aggregator service let service_handle = tokio::spawn(async move { aggregator_service.run().await; });