From b2e9670f16ea21cbf2dab25b87a6402478107508 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 12 Aug 2025 12:39:55 -0400 Subject: [PATCH 01/12] wip --- bottlecap/src/bin/bottlecap/main.rs | 82 ++++++----- .../src/lifecycle/invocation/processor.rs | 5 +- bottlecap/src/metrics/enhanced/lambda.rs | 134 +++++++----------- 3 files changed, 100 insertions(+), 121 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 09b331f89..fb36ecb2a 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,7 +66,8 @@ use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_utils::send_data::SendData; use decrypt::resolve_secrets; use dogstatsd::{ - aggregator::Aggregator as MetricsAggregator, + aggregator_service::AggregatorHandle as MetricsAggregatorHandle, + aggregator_service::AggregatorService as MetricsAggregatorService, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{ @@ -87,7 +88,7 @@ use std::{ os::unix::process::CommandExt, path::Path, process::Command, - sync::{Arc, Mutex}, + sync::Arc, time::{Duration, Instant}, }; use tokio::{sync::Mutex as TokioMutex, sync::RwLock, sync::mpsc::Sender, task::JoinHandle}; @@ -494,13 +495,11 @@ async fn extension_loop_active( ); let metrics_aggr_init_start_time = Instant::now(); - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new( - SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS), - CONTEXTS, - ) - .expect("failed to create aggregator"), - )); + let (metrics_aggr_service, metrics_aggr_handle) = MetricsAggregatorService::new( + SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("can't create metrics service"); debug!( "Metrics aggregator created in {:} microseconds", metrics_aggr_init_start_time @@ -508,10 +507,11 @@ async fn extension_loop_active( .as_micros() .to_string() ); + start_dogstatsd_aggregator(metrics_aggr_service); let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( Arc::clone(&api_key_factory), - &metrics_aggr, + metrics_aggr_handle.clone(), config, ))); // Lifecycle Invocation Processor @@ -519,7 +519,7 @@ async fn extension_loop_active( Arc::clone(&tags_provider), Arc::clone(config), Arc::clone(&aws_config), - Arc::clone(&metrics_aggr), + metrics_aggr_handle.clone(), ))); // AppSec processor (if enabled) let appsec_processor = match AppSecProcessor::new(config) { @@ -568,7 +568,7 @@ async fn extension_loop_active( } }); - let dogstatsd_cancel_token = start_dogstatsd(&metrics_aggr).await; + let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await; let telemetry_listener_cancel_token = setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; @@ -623,7 +623,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle.clone(), ) .await; } @@ -638,7 +638,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle.clone(), ) .await; let next_response = next_event(client, &r.extension_id).await; @@ -670,11 +670,15 @@ async fn extension_loop_active( })); let (metrics_flushers_copy, series, sketches) = { let locked_metrics = metrics_flushers.lock().await; - let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + let flush_response = metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); ( locked_metrics.clone(), - aggregator.consume_metrics(), - aggregator.consume_distributions(), + flush_response.series, + flush_response.distributions, ) }; for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { @@ -711,7 +715,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; last_continuous_flush_error = false; @@ -751,7 +755,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; } @@ -807,7 +811,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; return Ok(()); @@ -822,18 +826,20 @@ async fn blocking_flush_all( stats_flusher: &impl StatsFlusher, proxy_flusher: &ProxyFlusher, race_flush_interval: &mut tokio::time::Interval, - metrics_aggr: &Arc>, + metrics_aggr_handle: &MetricsAggregatorHandle, ) { - let (series, sketches) = { - let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); - ( - aggregator.consume_metrics(), - aggregator.consume_distributions(), - ) - }; + let flush_response = metrics_aggr_handle + .flush() + .await + .expect("can't flush metrics aggr handle"); let metrics_futures: Vec<_> = metrics_flushers .iter_mut() - .map(|f| f.flush_metrics(series.clone(), sketches.clone())) + .map(|f| { + f.flush_metrics( + flush_response.series.clone(), + flush_response.distributions.clone(), + ) + }) .collect(); tokio::join!( @@ -1008,7 +1014,7 @@ fn start_logs_agent( fn start_metrics_flushers( api_key_factory: Arc, - metrics_aggr: &Arc>, + metrics_aggr_handle: MetricsAggregatorHandle, config: &Arc, ) -> Vec { let mut flushers = Vec::new(); @@ -1031,7 +1037,7 @@ fn start_metrics_flushers( let flusher_config = MetricsFlusherConfig { api_key_factory, - aggregator: Arc::clone(metrics_aggr), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"), https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), @@ -1059,7 +1065,7 @@ fn start_metrics_flushers( let additional_api_key_factory = Arc::new(ApiKeyFactory::new(api_key)); let additional_flusher_config = MetricsFlusherConfig { api_key_factory: additional_api_key_factory, - aggregator: metrics_aggr.clone(), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: metrics_intake_url.clone(), https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), @@ -1075,7 +1081,7 @@ fn start_metrics_flushers( fn start_trace_agent( config: &Arc, api_key_factory: &Arc, - tags_provider: &Arc, + tags_provider: &Arc, invocation_processor: Arc>, appsec_processor: Option>>, trace_aggregator: Arc>, @@ -1157,7 +1163,13 @@ fn start_trace_agent( ) } -async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { +fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) { + tokio::spawn(async move { + aggr_service.run().await; + }); +} + +async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), port: DOGSTATSD_PORT, @@ -1165,7 +1177,7 @@ async fn start_dogstatsd(metrics_aggr: &Arc>) -> Cancel let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - Arc::clone(metrics_aggr), + metrics_aggr_handle, dogstatsd_cancel_token.clone(), ) .await; diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ca87759bb..896538dc1 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,13 +1,12 @@ use std::{ collections::{HashMap, VecDeque}, - sync::{Arc, Mutex}, + sync::Arc, time::{Instant, SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::tracer_header_tags; -use dogstatsd::aggregator::Aggregator as MetricsAggregator; use serde_json::{Value, json}; use tokio::sync::watch; use tracing::{debug, warn}; @@ -88,7 +87,7 @@ impl Processor { tags_provider: Arc, config: Arc, aws_config: Arc, - metrics_aggregator: Arc>, + metrics_aggregator: dogstatsd::aggregator_service::AggregatorHandle, ) -> Self { let resource = tags_provider .get_canonical_resource_name() diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 34879ce87..e32c57958 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -4,12 +4,12 @@ use crate::metrics::enhanced::{ }; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::{InitType, ReportMetrics, RuntimeDoneMetrics}; -use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; -use dogstatsd::{aggregator::Aggregator, metric::SortedTags}; +use dogstatsd::metric::SortedTags; +use dogstatsd::{aggregator_service::AggregatorHandle, metric}; use std::collections::HashMap; use std::env::consts::ARCH; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use tokio::{ sync::watch::{Receiver, Sender}, @@ -19,7 +19,7 @@ use tracing::debug; use tracing::error; pub struct Lambda { - pub aggregator: Arc>, + pub aggr_handle: AggregatorHandle, pub config: Arc, // Dynamic value tags are the ones we cannot obtain statically from the sandbox dynamic_value_tags: HashMap, @@ -28,9 +28,9 @@ pub struct Lambda { impl Lambda { #[must_use] - pub fn new(aggregator: Arc>, config: Arc) -> Lambda { + pub fn new(aggregator: AggregatorHandle, config: Arc) -> Lambda { Lambda { - aggregator, + aggr_handle: aggregator, config, dynamic_value_tags: HashMap::new(), invoked_received: false, @@ -112,12 +112,7 @@ impl Lambda { Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert metric: {}", e); } } @@ -136,12 +131,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert metric: {}", e); } } @@ -157,12 +147,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert runtime duration metric: {}", e); } @@ -174,12 +159,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert produced bytes metric: {}", e); } } @@ -209,12 +189,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert post runtime duration metric: {}", e); } } @@ -222,7 +197,7 @@ impl Lambda { pub fn generate_network_enhanced_metrics( network_data_offset: NetworkData, network_data_end: NetworkData, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -241,7 +216,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert rx_bytes metric: {}", e); } @@ -251,7 +226,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tx_bytes metric: {}", e); } @@ -261,7 +236,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert total_network metric: {}", e); } } @@ -272,15 +247,14 @@ impl Lambda { } if let Some(offset) = network_offset { - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); match proc::get_network_data() { Ok(data) => { Self::generate_network_enhanced_metrics( offset, data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -296,7 +270,7 @@ impl Lambda { pub(crate) fn generate_cpu_time_enhanced_metrics( cpu_data_offset: &CPUData, cpu_data_end: &CPUData, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let cpu_user_time = cpu_data_end.total_user_time_ms - cpu_data_offset.total_user_time_ms; @@ -315,7 +289,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_user_time metric: {}", e); } @@ -325,7 +299,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_system_time metric: {}", e); } @@ -335,7 +309,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_total_time metric: {}", e); } } @@ -345,8 +319,7 @@ impl Lambda { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); let cpu_data = proc::get_cpu_data(); match (cpu_offset, cpu_data) { @@ -354,7 +327,7 @@ impl Lambda { Self::generate_cpu_time_enhanced_metrics( &cpu_offset, &cpu_data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -369,7 +342,7 @@ impl Lambda { cpu_data_end: &CPUData, uptime_data_offset: f64, uptime_data_end: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let num_cores = cpu_data_end.individual_cpu_idle_times.len() as f64; @@ -420,7 +393,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_total_utilization_pct metric: {}", e); } @@ -430,7 +403,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_total_utilization metric: {}", e); } @@ -440,7 +413,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert num_cores metric: {}", e); } @@ -450,7 +423,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_max_utilization metric: {}", e); } @@ -460,7 +433,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_min_utilization metric: {}", e); } } @@ -474,8 +447,7 @@ impl Lambda { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); let cpu_data = proc::get_cpu_data(); let uptime_data = proc::get_uptime(); @@ -486,7 +458,7 @@ impl Lambda { &cpu_data, uptime_offset, uptime_data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -499,7 +471,7 @@ impl Lambda { pub fn generate_tmp_enhanced_metrics( tmp_max: f64, tmp_used: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -514,7 +486,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_max metric: {}", e); } @@ -524,7 +496,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_used metric: {}", e); } @@ -535,7 +507,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_free metric: {}", e); } } @@ -545,7 +517,7 @@ impl Lambda { return; } - let aggr = Arc::clone(&self.aggregator); + let aggr = self.aggr_handle.clone(); let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { @@ -566,9 +538,7 @@ impl Lambda { biased; // When the stop signal is received, generate final metrics _ = send_metrics.changed() => { - let mut aggr: std::sync::MutexGuard = - aggr.lock().expect("lock poisoned"); - Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr, tags); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &aggr, tags); return; } // Otherwise keep monitoring tmp usage periodically @@ -592,7 +562,7 @@ impl Lambda { fd_use: f64, threads_max: f64, threads_use: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -607,7 +577,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert fd_max metric: {}", e); } @@ -619,7 +589,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert fd_use metric: {}", e); } } else { @@ -632,7 +602,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert threads_max metric: {}", e); } @@ -644,7 +614,7 @@ impl Lambda { tags, Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert threads_use metric: {}", e); } } else { @@ -657,7 +627,7 @@ impl Lambda { return; } - let aggr = Arc::clone(&self.aggregator); + let aggr = self.aggr_handle.clone(); let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { @@ -678,9 +648,7 @@ impl Lambda { biased; // When the stop signal is received, generate final metrics _ = send_metrics.changed() => { - let mut aggr: std::sync::MutexGuard = - aggr.lock().expect("lock poisoned"); - Self::generate_process_metrics(fd_max, fd_use, threads_max, threads_use, &mut aggr, tags.clone()); + Self::generate_process_metrics(fd_max, fd_use, threads_max, threads_use, &aggr, tags.clone()); return; } // Otherwise keep monitoring file descriptor and thread usage periodically @@ -717,15 +685,13 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); let metric = metric::Metric::new( constants::DURATION_METRIC.into(), MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert duration metric: {}", e); } let metric = metric::Metric::new( @@ -734,7 +700,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert billed duration metric: {}", e); } let metric = metric::Metric::new( @@ -743,7 +709,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert max memory used metric: {}", e); } let metric = metric::Metric::new( @@ -752,7 +718,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert memory size metric: {}", e); } @@ -764,7 +730,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert estimated cost metric: {}", e); } } @@ -791,9 +757,11 @@ impl PartialEq for EnhancedMetricData { #[allow(clippy::unwrap_used)] mod tests { use std::collections::HashMap; + use std::sync::Mutex; use super::*; use crate::config; + use dogstatsd::aggregator::Aggregator; use dogstatsd::metric::EMPTY_TAGS; const PRECISION: f64 = 0.000_000_01; From b7d2e2b763816571def1a58b1900a016e419aaec Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 18 Aug 2025 21:22:26 -0400 Subject: [PATCH 02/12] wip --- bottlecap/src/bin/bottlecap/main.rs | 218 ++++++++++++++++------------ 1 file changed, 126 insertions(+), 92 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index fb36ecb2a..4dc355ef7 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -560,12 +560,19 @@ async fn extension_loop_active( let lifecycle_listener = LifecycleListener { invocation_processor: Arc::clone(&invocation_processor), }; - // TODO(astuyve): deprioritize this task after the first request - tokio::spawn(async move { - let res = lifecycle_listener.start().await; - if let Err(e) = res { - error!("Error starting hello agent: {e:?}"); - } + tokio::task::spawn_blocking(move || { + // Create a new single-threaded runtime for this blocking task + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create runtime for lifecycle listener"); + + rt.block_on(async move { + let res = lifecycle_listener.start().await; + if let Err(e) = res { + error!("Error starting hello agent: {e:?}"); + } + }); }); let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await; @@ -593,7 +600,7 @@ async fn extension_loop_active( let next_lambda_response = next_event(client, &r.extension_id).await; // first invoke we must call next let mut pending_flush_handles = PendingFlushHandles::new(); - let mut last_continuous_flush_error = false; + let continuous_flush_in_progress = Arc::new(std::sync::atomic::AtomicBool::new(false)); handle_next_invocation(next_lambda_response, invocation_processor.clone()).await; loop { let maybe_shutdown_event; @@ -614,90 +621,134 @@ async fn extension_loop_active( } } } - _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - &mut race_flush_interval, - &metrics_aggr_handle.clone(), - ) - .await; - } + // Removed race_flush_interval.tick() to prevent blocking } } - // flush - let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - &mut race_flush_interval, - &metrics_aggr_handle.clone(), - ) - .await; + // flush - skip for continuous flush strategy to avoid blocking + if current_flush_decision != FlushDecision::Continuous { + // Only flush for non-continuous strategies + let mut locked_metrics = metrics_flushers.lock().await; + blocking_flush_all( + &logs_flusher, + &mut locked_metrics, + &*trace_flusher, + &*stats_flusher, + &proxy_flusher, + &mut race_flush_interval, + &metrics_aggr_handle.clone(), + ) + .await; + } + // For continuous strategy, we skip the flush here entirely since we're + // already flushing continuously at the beginning of invocations let next_response = next_event(client, &r.extension_id).await; maybe_shutdown_event = handle_next_invocation(next_response, invocation_processor.clone()).await; } else { - //Periodic flush scenario, flush at top of invocation - if current_flush_decision == FlushDecision::Continuous && !last_continuous_flush_error { - let tf = trace_flusher.clone(); - // Await any previous flush handles. This - last_continuous_flush_error = pending_flush_handles - .await_flush_handles( - &logs_flusher.clone(), - &tf, - &metrics_flushers, - &proxy_flusher, - ) - .await; + // NO FLUSH SCENARIO OR CONTINUOUS FLUSH + // CRITICAL: Call /next FIRST before doing any flush operations + // to avoid Lambda timeout while setting up flushes + let next_lambda_response = next_event(client, &r.extension_id); + tokio::pin!(next_lambda_response); + + // Now set up continuous flush if needed + if current_flush_decision == FlushDecision::Continuous + && !continuous_flush_in_progress.load(std::sync::atomic::Ordering::Relaxed) + { + // Set flag to prevent concurrent flushes + continuous_flush_in_progress.store(true, std::sync::atomic::Ordering::Relaxed); + // Clone the flag for the spawned task to reset it + let flush_flag = continuous_flush_in_progress.clone(); + + // Spawn logs flush let lf = logs_flusher.clone(); pending_flush_handles .log_flush_handles .push_back(tokio::spawn(async move { lf.flush(None).await })); + + // Spawn trace flush let tf = trace_flusher.clone(); pending_flush_handles .trace_flush_handles .push_back(tokio::spawn(async move { tf.flush(None).await.unwrap_or_default() })); - let (metrics_flushers_copy, series, sketches) = { - let locked_metrics = metrics_flushers.lock().await; - let flush_response = metrics_aggr_handle - .clone() - .flush() - .await - .expect("can't flush metrics handle"); - ( - locked_metrics.clone(), - flush_response.series, - flush_response.distributions, - ) - }; - for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { - let series_clone = series.clone(); - let sketches_clone = sketches.clone(); - let handle = tokio::spawn(async move { - let (retry_series, retry_sketches) = flusher - .flush_metrics(series_clone.clone(), sketches_clone.clone()) + + // Spawn the metrics aggregator flush and metric flushes as a background task + let metrics_aggr_handle_clone = metrics_aggr_handle.clone(); + let metrics_flushers_clone = metrics_flushers.clone(); + + let metrics_handle = tokio::spawn(async move { + // Get metrics data from aggregator (this is the blocking part) + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = metrics_flushers_clone.lock().await; + let flush_response = metrics_aggr_handle_clone + .flush() .await - .unwrap_or_default(); - MetricsRetryBatch { - flusher_id: idx, - series: retry_series, - sketches: retry_sketches, + .expect("can't flush metrics handle"); + ( + locked_metrics.clone(), + flush_response.series, + flush_response.distributions, + ) + }; + + // Spawn individual metric flusher tasks + let mut handles = Vec::new(); + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone, sketches_clone) + .await + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, + } + }); + handles.push(handle); + } + + // Collect all metric flush results + let mut results = Vec::new(); + for handle in handles { + if let Ok(result) = handle.await { + results.push(result); } - }); - pending_flush_handles.metric_flush_handles.push_back(handle); - } + } + results + }); + + // Store the aggregated handle that will resolve to all metric flush results + pending_flush_handles + .metric_flush_handles + .push_back(tokio::spawn(async move { + // This will flatten the results when awaited + let result = if let Ok(results) = metrics_handle.await { + // Just return the first one for now (we'd need to change the type to handle multiple) + results.into_iter().next().unwrap_or(MetricsRetryBatch { + flusher_id: 0, + series: Vec::new(), + sketches: Vec::new(), + }) + } else { + MetricsRetryBatch { + flusher_id: 0, + series: Vec::new(), + sketches: Vec::new(), + } + }; + // Reset the flag when the flush completes + flush_flag.store(false, std::sync::atomic::Ordering::Relaxed); + result + })); + + // Spawn proxy flush let pf = proxy_flusher.clone(); pending_flush_handles .proxy_flush_handles @@ -718,15 +769,9 @@ async fn extension_loop_active( &metrics_aggr_handle, ) .await; - last_continuous_flush_error = false; } - // NO FLUSH SCENARIO - // JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT - // If we get platform.runtimeDone or platform.runtimeReport - // That's fine, we still wait to break until we get the response from next - // and then we break to determine if we'll flush or not - let next_lambda_response = next_event(client, &r.extension_id); - tokio::pin!(next_lambda_response); + + // Wait for the next Lambda invocation 'next_invocation: loop { tokio::select! { biased; @@ -746,24 +791,13 @@ async fn extension_loop_active( Some(event) = event_bus.rx.recv() => { handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } - _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - &mut race_flush_interval, - &metrics_aggr_handle, - ) - .await; - } + // Completely removed race_flush_interval.tick() arm to prevent any delays } } } if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event { + println!("AJ: Shutting down"); // Redrive/block on any failed payloads let tf = trace_flusher.clone(); pending_flush_handles From 3207e50155e8a6ed86d82a53b2be04145ced4ef2 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 19 Aug 2025 10:24:06 -0400 Subject: [PATCH 03/12] spawn blocking flush tasks so they happen in a different threadpool so compression won't block the main loop --- bottlecap/src/bin/bottlecap/main.rs | 48 +++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 4dc355ef7..feb7bf6e8 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -665,14 +665,26 @@ async fn extension_loop_active( let lf = logs_flusher.clone(); pending_flush_handles .log_flush_handles - .push_back(tokio::spawn(async move { lf.flush(None).await })); + .push_back(tokio::spawn(async move { + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(lf.flush(None)) + }) + .await + .unwrap_or_default() + })); // Spawn trace flush let tf = trace_flusher.clone(); pending_flush_handles .trace_flush_handles .push_back(tokio::spawn(async move { - tf.flush(None).await.unwrap_or_default() + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current() + .block_on(tf.flush(None)) + .unwrap_or_default() + }) + .await + .unwrap_or_default() })); // Spawn the metrics aggregator flush and metric flushes as a background task @@ -683,10 +695,13 @@ async fn extension_loop_active( // Get metrics data from aggregator (this is the blocking part) let (metrics_flushers_copy, series, sketches) = { let locked_metrics = metrics_flushers_clone.lock().await; - let flush_response = metrics_aggr_handle_clone - .flush() - .await - .expect("can't flush metrics handle"); + let flush_response = tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current() + .block_on(metrics_aggr_handle_clone.flush()) + .expect("can't flush metrics handle") + }) + .await + .expect("spawn_blocking failed"); ( locked_metrics.clone(), flush_response.series, @@ -700,8 +715,14 @@ async fn extension_loop_active( let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { - let (retry_series, retry_sketches) = flusher - .flush_metrics(series_clone, sketches_clone) + let (retry_series, retry_sketches) = + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current() + .block_on( + flusher.flush_metrics(series_clone, sketches_clone), + ) + .unwrap_or_default() + }) .await .unwrap_or_default(); MetricsRetryBatch { @@ -753,7 +774,13 @@ async fn extension_loop_active( pending_flush_handles .proxy_flush_handles .push_back(tokio::spawn(async move { - pf.flush(None).await.unwrap_or_default() + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current() + .block_on(pf.flush(None)) + .unwrap_or_default() + }) + .await + .unwrap_or_default() })); race_flush_interval.reset(); @@ -789,7 +816,8 @@ async fn extension_loop_active( break 'next_invocation; } Some(event) = event_bus.rx.recv() => { - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + tokio::task::yield_now().await; } // Completely removed race_flush_interval.tick() arm to prevent any delays } From bf84f3211af0397bfd45cf5528d4e67e5a1df824 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 19 Aug 2025 13:47:57 -0400 Subject: [PATCH 04/12] 5k context --- bottlecap/src/lifecycle/invocation/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 7f4003739..2a2c9f615 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -134,7 +134,7 @@ impl Default for ContextBuffer { /// Creates a new `ContextBuffer` with a default capacity of 5. /// fn default() -> Self { - ContextBuffer::with_capacity(5) + ContextBuffer::with_capacity(5000) } } From a4b4e0997c94d8edc6df01df330ccf366a5f448b Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 25 Aug 2025 13:35:47 -0400 Subject: [PATCH 05/12] Revert "spawn blocking flush tasks so they happen in a different threadpool so compression won't block the main loop" This reverts commit 6837a4ea15fa9938d5f7031760e16b636f023f14. --- bottlecap/src/bin/bottlecap/main.rs | 46 ++++++----------------------- 1 file changed, 9 insertions(+), 37 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index feb7bf6e8..c2da2f2c1 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -665,26 +665,14 @@ async fn extension_loop_active( let lf = logs_flusher.clone(); pending_flush_handles .log_flush_handles - .push_back(tokio::spawn(async move { - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(lf.flush(None)) - }) - .await - .unwrap_or_default() - })); + .push_back(tokio::spawn(async move { lf.flush(None).await })); // Spawn trace flush let tf = trace_flusher.clone(); pending_flush_handles .trace_flush_handles .push_back(tokio::spawn(async move { - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current() - .block_on(tf.flush(None)) - .unwrap_or_default() - }) - .await - .unwrap_or_default() + tf.flush(None).await.unwrap_or_default() })); // Spawn the metrics aggregator flush and metric flushes as a background task @@ -695,13 +683,10 @@ async fn extension_loop_active( // Get metrics data from aggregator (this is the blocking part) let (metrics_flushers_copy, series, sketches) = { let locked_metrics = metrics_flushers_clone.lock().await; - let flush_response = tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current() - .block_on(metrics_aggr_handle_clone.flush()) - .expect("can't flush metrics handle") - }) - .await - .expect("spawn_blocking failed"); + let flush_response = metrics_aggr_handle_clone + .flush() + .await + .expect("can't flush metrics handle"); ( locked_metrics.clone(), flush_response.series, @@ -715,14 +700,8 @@ async fn extension_loop_active( let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { - let (retry_series, retry_sketches) = - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current() - .block_on( - flusher.flush_metrics(series_clone, sketches_clone), - ) - .unwrap_or_default() - }) + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone, sketches_clone) .await .unwrap_or_default(); MetricsRetryBatch { @@ -774,13 +753,7 @@ async fn extension_loop_active( pending_flush_handles .proxy_flush_handles .push_back(tokio::spawn(async move { - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current() - .block_on(pf.flush(None)) - .unwrap_or_default() - }) - .await - .unwrap_or_default() + pf.flush(None).await.unwrap_or_default() })); race_flush_interval.reset(); @@ -817,7 +790,6 @@ async fn extension_loop_active( } Some(event) = event_bus.rx.recv() => { handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; - tokio::task::yield_now().await; } // Completely removed race_flush_interval.tick() arm to prevent any delays } From 3fe1ee021135d8071ecee217e0aa8881d37b5722 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 25 Aug 2025 13:40:46 -0400 Subject: [PATCH 06/12] Revert "wip" This reverts commit 527fa49373b411fcd58003b087477d3e292c0e4c. --- bottlecap/src/bin/bottlecap/main.rs | 218 ++++++++++++---------------- 1 file changed, 92 insertions(+), 126 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index c2da2f2c1..fb9b61612 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -560,19 +560,12 @@ async fn extension_loop_active( let lifecycle_listener = LifecycleListener { invocation_processor: Arc::clone(&invocation_processor), }; - tokio::task::spawn_blocking(move || { - // Create a new single-threaded runtime for this blocking task - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to create runtime for lifecycle listener"); - - rt.block_on(async move { - let res = lifecycle_listener.start().await; - if let Err(e) = res { - error!("Error starting hello agent: {e:?}"); - } - }); + // TODO(astuyve): deprioritize this task after the first request + tokio::spawn(async move { + let res = lifecycle_listener.start().await; + if let Err(e) = res { + error!("Error starting hello agent: {e:?}"); + } }); let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await; @@ -600,7 +593,7 @@ async fn extension_loop_active( let next_lambda_response = next_event(client, &r.extension_id).await; // first invoke we must call next let mut pending_flush_handles = PendingFlushHandles::new(); - let continuous_flush_in_progress = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let mut last_continuous_flush_error = false; handle_next_invocation(next_lambda_response, invocation_processor.clone()).await; loop { let maybe_shutdown_event; @@ -621,134 +614,90 @@ async fn extension_loop_active( } } } - // Removed race_flush_interval.tick() to prevent blocking + _ = race_flush_interval.tick() => { + let mut locked_metrics = metrics_flushers.lock().await; + blocking_flush_all( + &logs_flusher, + &mut locked_metrics, + &*trace_flusher, + &*stats_flusher, + &proxy_flusher, + &mut race_flush_interval, + &metrics_aggr_handle.clone(), + ) + .await; + } } } - // flush - skip for continuous flush strategy to avoid blocking - if current_flush_decision != FlushDecision::Continuous { - // Only flush for non-continuous strategies - let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - &mut race_flush_interval, - &metrics_aggr_handle.clone(), - ) - .await; - } - // For continuous strategy, we skip the flush here entirely since we're - // already flushing continuously at the beginning of invocations + // flush + let mut locked_metrics = metrics_flushers.lock().await; + blocking_flush_all( + &logs_flusher, + &mut locked_metrics, + &*trace_flusher, + &*stats_flusher, + &proxy_flusher, + &mut race_flush_interval, + &metrics_aggr_handle.clone(), + ) + .await; let next_response = next_event(client, &r.extension_id).await; maybe_shutdown_event = handle_next_invocation(next_response, invocation_processor.clone()).await; } else { - // NO FLUSH SCENARIO OR CONTINUOUS FLUSH - // CRITICAL: Call /next FIRST before doing any flush operations - // to avoid Lambda timeout while setting up flushes - let next_lambda_response = next_event(client, &r.extension_id); - tokio::pin!(next_lambda_response); - - // Now set up continuous flush if needed - if current_flush_decision == FlushDecision::Continuous - && !continuous_flush_in_progress.load(std::sync::atomic::Ordering::Relaxed) - { - // Set flag to prevent concurrent flushes - continuous_flush_in_progress.store(true, std::sync::atomic::Ordering::Relaxed); - - // Clone the flag for the spawned task to reset it - let flush_flag = continuous_flush_in_progress.clone(); + //Periodic flush scenario, flush at top of invocation + if current_flush_decision == FlushDecision::Continuous && !last_continuous_flush_error { + let tf = trace_flusher.clone(); + // Await any previous flush handles. This + last_continuous_flush_error = pending_flush_handles + .await_flush_handles( + &logs_flusher.clone(), + &tf, + &metrics_flushers, + &proxy_flusher, + ) + .await; - // Spawn logs flush let lf = logs_flusher.clone(); pending_flush_handles .log_flush_handles .push_back(tokio::spawn(async move { lf.flush(None).await })); - - // Spawn trace flush let tf = trace_flusher.clone(); pending_flush_handles .trace_flush_handles .push_back(tokio::spawn(async move { tf.flush(None).await.unwrap_or_default() })); - - // Spawn the metrics aggregator flush and metric flushes as a background task - let metrics_aggr_handle_clone = metrics_aggr_handle.clone(); - let metrics_flushers_clone = metrics_flushers.clone(); - - let metrics_handle = tokio::spawn(async move { - // Get metrics data from aggregator (this is the blocking part) - let (metrics_flushers_copy, series, sketches) = { - let locked_metrics = metrics_flushers_clone.lock().await; - let flush_response = metrics_aggr_handle_clone - .flush() + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = metrics_flushers.lock().await; + let flush_response = metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); + ( + locked_metrics.clone(), + flush_response.series, + flush_response.distributions, + ) + }; + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone.clone(), sketches_clone.clone()) .await - .expect("can't flush metrics handle"); - ( - locked_metrics.clone(), - flush_response.series, - flush_response.distributions, - ) - }; - - // Spawn individual metric flusher tasks - let mut handles = Vec::new(); - for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { - let series_clone = series.clone(); - let sketches_clone = sketches.clone(); - let handle = tokio::spawn(async move { - let (retry_series, retry_sketches) = flusher - .flush_metrics(series_clone, sketches_clone) - .await - .unwrap_or_default(); - MetricsRetryBatch { - flusher_id: idx, - series: retry_series, - sketches: retry_sketches, - } - }); - handles.push(handle); - } - - // Collect all metric flush results - let mut results = Vec::new(); - for handle in handles { - if let Ok(result) = handle.await { - results.push(result); + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, } - } - results - }); - - // Store the aggregated handle that will resolve to all metric flush results - pending_flush_handles - .metric_flush_handles - .push_back(tokio::spawn(async move { - // This will flatten the results when awaited - let result = if let Ok(results) = metrics_handle.await { - // Just return the first one for now (we'd need to change the type to handle multiple) - results.into_iter().next().unwrap_or(MetricsRetryBatch { - flusher_id: 0, - series: Vec::new(), - sketches: Vec::new(), - }) - } else { - MetricsRetryBatch { - flusher_id: 0, - series: Vec::new(), - sketches: Vec::new(), - } - }; - - // Reset the flag when the flush completes - flush_flag.store(false, std::sync::atomic::Ordering::Relaxed); - result - })); + }); + pending_flush_handles.metric_flush_handles.push_back(handle); + } - // Spawn proxy flush let pf = proxy_flusher.clone(); pending_flush_handles .proxy_flush_handles @@ -769,9 +718,15 @@ async fn extension_loop_active( &metrics_aggr_handle, ) .await; + last_continuous_flush_error = false; } - - // Wait for the next Lambda invocation + // NO FLUSH SCENARIO + // JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT + // If we get platform.runtimeDone or platform.runtimeReport + // That's fine, we still wait to break until we get the response from next + // and then we break to determine if we'll flush or not + let next_lambda_response = next_event(client, &r.extension_id); + tokio::pin!(next_lambda_response); 'next_invocation: loop { tokio::select! { biased; @@ -791,13 +746,24 @@ async fn extension_loop_active( Some(event) = event_bus.rx.recv() => { handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } - // Completely removed race_flush_interval.tick() arm to prevent any delays + _ = race_flush_interval.tick() => { + let mut locked_metrics = metrics_flushers.lock().await; + blocking_flush_all( + &logs_flusher, + &mut locked_metrics, + &*trace_flusher, + &*stats_flusher, + &proxy_flusher, + &mut race_flush_interval, + &metrics_aggr_handle, + ) + .await; + } } } } if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event { - println!("AJ: Shutting down"); // Redrive/block on any failed payloads let tf = trace_flusher.clone(); pending_flush_handles From 6268e9049d23f81a1b6e28bcf5af66af122b19b0 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 26 Aug 2025 16:14:50 -0700 Subject: [PATCH 07/12] wip: migrate tests and proxy to channel aggr --- .../src/lifecycle/invocation/processor.rs | 57 +-- bottlecap/src/metrics/enhanced/lambda.rs | 345 +++++++++++------- bottlecap/src/proxy/interceptor.rs | 12 +- 3 files changed, 254 insertions(+), 160 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 896538dc1..b69d6d5df 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -973,10 +973,10 @@ mod tests { use super::*; use crate::LAMBDA_RUNTIME_SLUG; use base64::{Engine, engine::general_purpose::STANDARD}; - use dogstatsd::aggregator::Aggregator; + use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; - fn setup() -> Processor { + async fn setup() -> Processor { let aws_config = Arc::new(AwsConfig { region: "us-east-1".into(), aws_lwa_proxy_lambda_runtime_api: Some("***".into()), @@ -998,11 +998,12 @@ mod tests { &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), )); - let metrics_aggregator = Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), - )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); - Processor::new(tags_provider, config, aws_config, metrics_aggregator) + Processor::new(tags_provider, config, aws_config, handle) } #[test] @@ -1064,9 +1065,9 @@ mod tests { assert_eq!(error_tags["error.stack"], error_stack); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_with_sampling_priority() { - let mut p = setup(); + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_with_sampling_priority() { + let mut p = setup().await; let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "999".to_string()); @@ -1093,9 +1094,9 @@ mod tests { assert_eq!(priority, Some(-1.0)); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_with_invalid_priority() { - let mut p = setup(); + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_with_invalid_priority() { + let mut p = setup().await; let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "888".to_string()); @@ -1122,9 +1123,9 @@ mod tests { assert_eq!(context.invocation_span.parent_id, 999); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_no_sampling_priority() { - let mut p = setup(); + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_no_sampling_priority() { + let mut p = setup().await; let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "111".to_string()); @@ -1147,9 +1148,9 @@ mod tests { assert_eq!(context.invocation_span.parent_id, 222); } - #[test] - fn test_process_on_invocation_end_tags_response_with_status_code() { - let mut p = setup(); + #[tokio::test] + async fn test_process_on_invocation_end_tags_response_with_status_code() { + let mut p = setup().await; let response = r#" { @@ -1186,9 +1187,9 @@ mod tests { ); } - #[test] - fn test_on_shutdown_event_creates_unused_init_metrics() { - let mut processor = setup(); + #[tokio::test] + async fn test_on_shutdown_event_creates_unused_init_metrics() { + let mut processor = setup().await; let now1 = i64::try_from(std::time::UNIX_EPOCH.elapsed().unwrap().as_secs()).unwrap(); let ts1 = (now1 / 10) * 10; @@ -1196,22 +1197,26 @@ mod tests { let now2 = i64::try_from(std::time::UNIX_EPOCH.elapsed().unwrap().as_secs()).unwrap(); let ts2 = (now2 / 10) * 10; - let aggregator = processor.enhanced_metrics.aggregator.lock().unwrap(); + let handle = &processor.enhanced_metrics.aggr_handle; assert!( - aggregator + handle .get_entry_by_id( crate::metrics::enhanced::constants::UNUSED_INIT.into(), - &None, + None, ts1 ) + .await + .unwrap() .is_some() - || aggregator + || handle .get_entry_by_id( crate::metrics::enhanced::constants::UNUSED_INIT.into(), - &None, + None, ts2 ) + .await + .unwrap() .is_some(), "UNUSED_INIT metric should be created when invoked_received=false" ); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index e32c57958..3eaec069b 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -4,8 +4,8 @@ use crate::metrics::enhanced::{ }; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::{InitType, ReportMetrics, RuntimeDoneMetrics}; -use dogstatsd::metric::{Metric, MetricValue}; use dogstatsd::metric::SortedTags; +use dogstatsd::metric::{Metric, MetricValue}; use dogstatsd::{aggregator_service::AggregatorHandle, metric}; use std::collections::HashMap; use std::env::consts::ARCH; @@ -757,38 +757,35 @@ impl PartialEq for EnhancedMetricData { #[allow(clippy::unwrap_used)] mod tests { use std::collections::HashMap; - use std::sync::Mutex; use super::*; use crate::config; - use dogstatsd::aggregator::Aggregator; + use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; const PRECISION: f64 = 0.000_000_01; - fn setup() -> (Arc>, Arc) { + fn setup() -> (AggregatorHandle, Arc) { let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), ..config::Config::default() }); - ( - Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), - )), - config, - ) + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); + + (handle, config) } - fn assert_sketch( - aggregator_mutex: &Mutex, - metric_id: &str, - value: f64, - timestamp: i64, - ) { + async fn assert_sketch(handle: &AggregatorHandle, metric_id: &str, value: f64, timestamp: i64) { let ts = (timestamp / 10) * 10; - let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, ts) { + if let Some(e) = handle + .get_entry_by_id(metric_id.into(), None, ts) + .await + .unwrap() + { let metric = e.value.get_sketch().unwrap(); assert!((metric.max().unwrap() - value).abs() < PRECISION); assert!((metric.min().unwrap() - value).abs() < PRECISION); @@ -799,9 +796,9 @@ mod tests { } } - #[test] + #[tokio::test] #[allow(clippy::float_cmp)] - fn test_increment_invocation_metric() { + async fn test_increment_invocation_metric() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH @@ -817,12 +814,12 @@ mod tests { .as_secs() .try_into() .unwrap_or_default(); - assert_sketch(&metrics_aggr, constants::INVOCATIONS_METRIC, 1f64, now); + assert_sketch(&metrics_aggr, constants::INVOCATIONS_METRIC, 1f64, now).await; } - #[test] + #[tokio::test] #[allow(clippy::float_cmp)] - fn test_increment_errors_metric() { + async fn test_increment_errors_metric() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH @@ -838,12 +835,12 @@ mod tests { .as_secs() .try_into() .unwrap_or_default(); - assert_sketch(&metrics_aggr, constants::ERRORS_METRIC, 1f64, now); + assert_sketch(&metrics_aggr, constants::ERRORS_METRIC, 1f64, now).await; } - #[test] + #[tokio::test] #[allow(clippy::too_many_lines)] - fn test_disabled() { + async fn test_disabled() { let (metrics_aggr, no_config) = setup(); let my_config = Arc::new(config::Config { enhanced_metrics: false, @@ -879,135 +876,224 @@ mod tests { }, now, ); - let aggr = metrics_aggr.lock().expect("lock poisoned"); assert!( - aggr.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::INVOCATIONS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::ERRORS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::ERRORS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TIMEOUTS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TIMEOUTS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::INIT_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::INIT_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::BILLED_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::BILLED_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::MAX_MEMORY_USED_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::MAX_MEMORY_USED_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::MEMORY_SIZE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::MEMORY_SIZE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::ESTIMATED_COST_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::ESTIMATED_COST_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::RX_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::RX_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TX_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TX_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TOTAL_NETWORK_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TOTAL_NETWORK_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_USER_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_USER_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_SYSTEM_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_SYSTEM_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_TOTAL_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_TOTAL_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id( - constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), - &None, - now - ) - .is_none() + metrics_aggr + .get_entry_by_id( + constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), + None, + now + ) + .await + .unwrap() + .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_TOTAL_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_TOTAL_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::NUM_CORES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::NUM_CORES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_MIN_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_MIN_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_USED_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_USED_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_FREE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::FD_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::FD_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); } - #[test] - fn test_set_runtime_done_metrics() { + #[tokio::test] + async fn test_set_runtime_done_metrics() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let runtime_done_metrics = RuntimeDoneMetrics { @@ -1027,12 +1113,13 @@ mod tests { constants::RUNTIME_DURATION_METRIC, 100.0, now, - ); - assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now); + ) + .await; + assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now).await; } - #[test] - fn test_set_report_log_metrics() { + #[tokio::test] + async fn test_set_report_log_metrics() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let report_metrics = ReportMetrics { @@ -1051,17 +1138,17 @@ mod tests { .unwrap_or_default(); lambda.set_report_log_metrics(&report_metrics, now); - assert_sketch(&metrics_aggr, constants::DURATION_METRIC, 0.1, now); - assert_sketch(&metrics_aggr, constants::BILLED_DURATION_METRIC, 0.1, now); + assert_sketch(&metrics_aggr, constants::DURATION_METRIC, 0.1, now).await; + assert_sketch(&metrics_aggr, constants::BILLED_DURATION_METRIC, 0.1, now).await; - assert_sketch(&metrics_aggr, constants::MAX_MEMORY_USED_METRIC, 128.0, now); - assert_sketch(&metrics_aggr, constants::MEMORY_SIZE_METRIC, 256.0, now); + assert_sketch(&metrics_aggr, constants::MAX_MEMORY_USED_METRIC, 128.0, now).await; + assert_sketch(&metrics_aggr, constants::MEMORY_SIZE_METRIC, 256.0, now).await; } - #[test] - fn test_set_network_enhanced_metrics() { + #[tokio::test] + async fn test_set_network_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1080,19 +1167,19 @@ mod tests { Lambda::generate_network_enhanced_metrics( network_offset, network_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0, now); - assert_sketch(&metrics_aggr, constants::TX_BYTES_METRIC, 74746.0, now); - assert_sketch(&metrics_aggr, constants::TOTAL_NETWORK_METRIC, 94746.0, now); + assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0, now).await; + assert_sketch(&metrics_aggr, constants::TX_BYTES_METRIC, 74746.0, now).await; + assert_sketch(&metrics_aggr, constants::TOTAL_NETWORK_METRIC, 94746.0, now).await; } - #[test] - fn test_set_cpu_time_enhanced_metrics() { + #[tokio::test] + async fn test_set_cpu_time_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1119,22 +1206,17 @@ mod tests { individual_cpu_idle_times: individual_cpu_idle_times_end, }; - Lambda::generate_cpu_time_enhanced_metrics( - &cpu_offset, - &cpu_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), - None, - ); + Lambda::generate_cpu_time_enhanced_metrics(&cpu_offset, &cpu_data, &metrics_aggr, None); - assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0, now); - assert_sketch(&metrics_aggr, constants::CPU_SYSTEM_TIME_METRIC, 53.0, now); - assert_sketch(&metrics_aggr, constants::CPU_TOTAL_TIME_METRIC, 153.0, now); + assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0, now).await; + assert_sketch(&metrics_aggr, constants::CPU_SYSTEM_TIME_METRIC, 53.0, now).await; + assert_sketch(&metrics_aggr, constants::CPU_TOTAL_TIME_METRIC, 153.0, now).await; } - #[test] - fn test_set_cpu_utilization_enhanced_metrics() { + #[tokio::test] + async fn test_set_cpu_utilization_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1168,7 +1250,7 @@ mod tests { &cpu_data, uptime_offset, uptime_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); @@ -1178,32 +1260,36 @@ mod tests { constants::CPU_TOTAL_UTILIZATION_PCT_METRIC, 30.0, now, - ); + ) + .await; assert_sketch( &metrics_aggr, constants::CPU_TOTAL_UTILIZATION_METRIC, 0.6, now, - ); - assert_sketch(&metrics_aggr, constants::NUM_CORES_METRIC, 2.0, now); + ) + .await; + assert_sketch(&metrics_aggr, constants::NUM_CORES_METRIC, 2.0, now).await; assert_sketch( &metrics_aggr, constants::CPU_MAX_UTILIZATION_METRIC, 30.0, now, - ); + ) + .await; assert_sketch( &metrics_aggr, constants::CPU_MIN_UTILIZATION_METRIC, 28.75, now, - ); + ) + .await; } - #[test] - fn test_set_tmp_enhanced_metrics() { + #[tokio::test] + async fn test_set_tmp_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1213,27 +1299,23 @@ mod tests { let tmp_max = 550_461_440.0; let tmp_used = 12_165_120.0; - Lambda::generate_tmp_enhanced_metrics( - tmp_max, - tmp_used, - &mut lambda.aggregator.lock().expect("lock poisoned"), - None, - ); + Lambda::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &metrics_aggr, None); - assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550_461_440.0, now); - assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12_165_120.0, now); + assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550_461_440.0, now).await; + assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12_165_120.0, now).await; assert_sketch( &metrics_aggr, constants::TMP_FREE_METRIC, 538_296_320.0, now, - ); + ) + .await; } - #[test] - fn test_set_process_enhanced_metrics_valid_use() { + #[tokio::test] + async fn test_set_process_enhanced_metrics_valid_use() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1250,20 +1332,20 @@ mod tests { fd_use, threads_max, threads_use, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0, now); + assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0, now).await; } - #[test] - fn test_set_process_enhanced_metrics_invalid_use() { + #[tokio::test] + async fn test_set_process_enhanced_metrics_invalid_use() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1280,20 +1362,25 @@ mod tests { fd_use, threads_max, threads_use, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now); + assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now).await; - let aggr = lambda.aggregator.lock().expect("lock poisoned"); assert!( - aggr.get_entry_by_id(constants::FD_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 0767bf25c..35b9d3dc4 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -430,12 +430,11 @@ mod tests { use http_body_util::BodyExt; use std::{ collections::HashMap, - sync::Mutex, time::{Duration, Instant}, }; use tokio::sync::Mutex as TokioMutex; - use dogstatsd::{aggregator::Aggregator as MetricsAggregator, metric::EMPTY_TAGS}; + use dogstatsd::{aggregator_service::AggregatorService, metric::EMPTY_TAGS}; use http_body_util::Full; use hyper::{server::conn::http1, service::service_fn}; use hyper_util::rt::TokioIo; @@ -482,9 +481,12 @@ mod tests { LAMBDA_RUNTIME_SLUG.to_string(), &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), )); - let metrics_aggregator = Arc::new(Mutex::new( - MetricsAggregator::new(EMPTY_TAGS, 1024).expect("failed to create metrics aggregator"), - )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); + + let metrics_aggregator = handle; let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), function_name: "arn:some-function".to_string(), From ae8737c4c9bddf4d79a2cd0cdf9601299848271b Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 26 Aug 2025 16:22:55 -0700 Subject: [PATCH 08/12] migrate metrics integration test --- bottlecap/tests/metrics_integration_test.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 83935a877..8f9ec8553 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -1,13 +1,13 @@ use bottlecap::config::Config; use bottlecap::metrics::enhanced::lambda::Lambda as enhanced_metrics; -use dogstatsd::aggregator::Aggregator as MetricsAggregator; +use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::api_key::ApiKeyFactory; use dogstatsd::datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}; use dogstatsd::flusher::Flusher as MetricsFlusher; use dogstatsd::flusher::FlusherConfig as MetricsFlusherConfig; use dogstatsd::metric::SortedTags; use httpmock::prelude::*; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; mod common; @@ -35,10 +35,12 @@ async fn test_enhanced_metrics() { let arc_config = Arc::new(Config::default()); - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(SortedTags::parse("aTagKey:aTagValue").unwrap(), 1024) - .expect("failed to create aggregator"), - )); + let (metrics_aggr_service, metrics_aggr_handle) = + AggregatorService::new(SortedTags::parse("aTagKey:aTagValue").unwrap(), 1024) + .expect("failed to create aggregator service"); + + tokio::spawn(metrics_aggr_service.run()); + let metrics_site_override = MetricsIntakeUrlPrefixOverride::maybe_new( None, Some(DdDdUrl::new(server.base_url()).expect("failed to create dd url")), @@ -46,7 +48,7 @@ async fn test_enhanced_metrics() { .expect("failed to create metrics override"); let flusher_config = MetricsFlusherConfig { api_key_factory: Arc::new(ApiKeyFactory::new(dd_api_key)), - aggregator: metrics_aggr.clone(), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(None, Some(metrics_site_override)) .expect("can't parse metrics intake URL from site"), https_proxy: None, @@ -55,7 +57,7 @@ async fn test_enhanced_metrics() { }; let mut metrics_flusher = MetricsFlusher::new(flusher_config); let lambda_enhanced_metrics = - enhanced_metrics::new(Arc::clone(&metrics_aggr), Arc::clone(&arc_config)); + enhanced_metrics::new(metrics_aggr_handle.clone(), Arc::clone(&arc_config)); let now = std::time::UNIX_EPOCH .elapsed() From 244b47d4f98cf4c4623d2a13bf7cab576c04f8e9 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 26 Aug 2025 16:24:27 -0700 Subject: [PATCH 09/12] clipy --- bottlecap/src/bin/bottlecap/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index fb9b61612..0f66bd945 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -511,7 +511,7 @@ async fn extension_loop_active( let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( Arc::clone(&api_key_factory), - metrics_aggr_handle.clone(), + &metrics_aggr_handle, config, ))); // Lifecycle Invocation Processor @@ -1014,7 +1014,7 @@ fn start_logs_agent( fn start_metrics_flushers( api_key_factory: Arc, - metrics_aggr_handle: MetricsAggregatorHandle, + metrics_aggr_handle: &MetricsAggregatorHandle, config: &Arc, ) -> Vec { let mut flushers = Vec::new(); From 2b42f2a84ae9ae9bd93ce5a042ecab61175bdc0a Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 26 Aug 2025 16:33:11 -0700 Subject: [PATCH 10/12] fix stupid AI mistake which made a setup method async for absolutely no reason --- bottlecap/Cargo.lock | 4 ++-- bottlecap/Cargo.toml | 4 ++-- bottlecap/src/bin/bottlecap/main.rs | 4 ++-- bottlecap/src/lifecycle/invocation/processor.rs | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 8db9fc8b8..2d0abefd6 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -763,7 +763,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" +source = "git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7#fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7" dependencies = [ "reqwest", "rustls", @@ -992,7 +992,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" +source = "git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7#fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7" dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 423bde1b3..240438c6b 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -62,8 +62,8 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 0f66bd945..04d3048f7 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -744,7 +744,7 @@ async fn extension_loop_active( break 'next_invocation; } Some(event) = event_bus.rx.recv() => { - handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } _ = race_flush_interval.tick() => { let mut locked_metrics = metrics_flushers.lock().await; @@ -1081,7 +1081,7 @@ fn start_metrics_flushers( fn start_trace_agent( config: &Arc, api_key_factory: &Arc, - tags_provider: &Arc, + tags_provider: &Arc, invocation_processor: Arc>, appsec_processor: Option>>, trace_aggregator: Arc>, diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index b69d6d5df..2bd152f17 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -976,7 +976,7 @@ mod tests { use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; - async fn setup() -> Processor { + fn setup() -> Processor { let aws_config = Arc::new(AwsConfig { region: "us-east-1".into(), aws_lwa_proxy_lambda_runtime_api: Some("***".into()), @@ -1067,7 +1067,7 @@ mod tests { #[tokio::test] async fn test_process_on_universal_instrumentation_end_headers_with_sampling_priority() { - let mut p = setup().await; + let mut p = setup(); let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "999".to_string()); @@ -1096,7 +1096,7 @@ mod tests { #[tokio::test] async fn test_process_on_universal_instrumentation_end_headers_with_invalid_priority() { - let mut p = setup().await; + let mut p = setup(); let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "888".to_string()); @@ -1125,7 +1125,7 @@ mod tests { #[tokio::test] async fn test_process_on_universal_instrumentation_end_headers_no_sampling_priority() { - let mut p = setup().await; + let mut p = setup(); let mut headers = HashMap::new(); headers.insert(DATADOG_TRACE_ID_KEY.to_string(), "111".to_string()); @@ -1150,7 +1150,7 @@ mod tests { #[tokio::test] async fn test_process_on_invocation_end_tags_response_with_status_code() { - let mut p = setup().await; + let mut p = setup(); let response = r#" { @@ -1189,7 +1189,7 @@ mod tests { #[tokio::test] async fn test_on_shutdown_event_creates_unused_init_metrics() { - let mut processor = setup().await; + let mut processor = setup(); let now1 = i64::try_from(std::time::UNIX_EPOCH.elapsed().unwrap().as_secs()).unwrap(); let ts1 = (now1 / 10) * 10; From 2de676f4d61c056044f9a54c266fae8d307a52e5 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Thu, 28 Aug 2025 13:24:12 -0700 Subject: [PATCH 11/12] use merge commit from sls-components. 500 contexts --- bottlecap/Cargo.lock | 17 ++++++++++++++--- bottlecap/Cargo.toml | 2 +- bottlecap/src/lifecycle/invocation/context.rs | 6 ++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 2d0abefd6..c04092ea5 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -512,7 +512,7 @@ dependencies = [ "bytes", "chrono", "cookie", - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-normalization", "datadog-trace-obfuscation", @@ -760,6 +760,17 @@ dependencies = [ "typenum", ] +[[package]] +name = "datadog-fips" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +dependencies = [ + "reqwest", + "rustls", + "rustls-native-certs", + "tracing", +] + [[package]] name = "datadog-fips" version = "0.1.0" @@ -992,9 +1003,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7#fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7" +source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" dependencies = [ - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 240438c6b..c5ead6a50 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -62,7 +62,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "abfec752b0638a9e4096e1465acd4bb2651edfa7", default-features = false } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] } diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 2a2c9f615..ce5eaa52c 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -131,10 +131,12 @@ struct UniversalInstrumentationData { } impl Default for ContextBuffer { - /// Creates a new `ContextBuffer` with a default capacity of 5. + /// Creates a new `ContextBuffer` with a default capacity of 500 + /// This gives us enough capacity to process events which may be delayed due to async tasks + /// piling up preventing us from reading them quickly enough /// fn default() -> Self { - ContextBuffer::with_capacity(5000) + ContextBuffer::with_capacity(500) } } From b70af0ceeebb60d7f33152d644c6c8e7ec404799 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Thu, 28 Aug 2025 13:29:46 -0700 Subject: [PATCH 12/12] feat: merge all inserts into one --- bottlecap/src/metrics/enhanced/lambda.rs | 81 ++++++++++-------------- 1 file changed, 34 insertions(+), 47 deletions(-) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 3eaec069b..7e3276251 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -387,54 +387,41 @@ impl Lambda { // Multiply by num_cores to report in terms of cores let cpu_total_utilization = cpu_total_utilization_decimal * num_cores; - let metric = Metric::new( - constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), - MetricValue::distribution(cpu_total_utilization_pct), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert_batch(vec![metric]) { - error!("Failed to insert cpu_total_utilization_pct metric: {}", e); - } - - let metric = Metric::new( - constants::CPU_TOTAL_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_total_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert_batch(vec![metric]) { - error!("Failed to insert cpu_total_utilization metric: {}", e); - } - - let metric = Metric::new( - constants::NUM_CORES_METRIC.into(), - MetricValue::distribution(num_cores), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert_batch(vec![metric]) { - error!("Failed to insert num_cores metric: {}", e); - } - - let metric = Metric::new( - constants::CPU_MAX_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_max_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert_batch(vec![metric]) { - error!("Failed to insert cpu_max_utilization metric: {}", e); - } + let metrics = vec![ + Metric::new( + constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), + MetricValue::distribution(cpu_total_utilization_pct), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_TOTAL_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_total_utilization), + tags.clone(), + Some(now), + ), + Metric::new( + constants::NUM_CORES_METRIC.into(), + MetricValue::distribution(num_cores), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_MAX_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_max_utilization), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_MIN_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_min_utilization), + tags, + Some(now), + ), + ]; - let metric = Metric::new( - constants::CPU_MIN_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_min_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert_batch(vec![metric]) { - error!("Failed to insert cpu_min_utilization metric: {}", e); + if let Err(e) = aggr.insert_batch(metrics) { + error!("Failed to insert cpu utilization metrics: {}", e); } }