diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 8db9fc8b8..c04092ea5 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -512,7 +512,7 @@ dependencies = [ "bytes", "chrono", "cookie", - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-normalization", "datadog-trace-obfuscation", @@ -763,7 +763,18 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" +source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +dependencies = [ + "reqwest", + "rustls", + "rustls-native-certs", + "tracing", +] + +[[package]] +name = "datadog-fips" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components?rev=fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7#fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7" dependencies = [ "reqwest", "rustls", @@ -992,9 +1003,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" +source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" dependencies = [ - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 423bde1b3..c5ead6a50 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -62,8 +62,8 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "abfec752b0638a9e4096e1465acd4bb2651edfa7", default-features = false } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 09b331f89..04d3048f7 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,7 +66,8 @@ use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_utils::send_data::SendData; use decrypt::resolve_secrets; use dogstatsd::{ - aggregator::Aggregator as MetricsAggregator, + aggregator_service::AggregatorHandle as MetricsAggregatorHandle, + aggregator_service::AggregatorService as MetricsAggregatorService, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{ @@ -87,7 +88,7 @@ use std::{ os::unix::process::CommandExt, path::Path, process::Command, - sync::{Arc, Mutex}, + sync::Arc, time::{Duration, Instant}, }; use tokio::{sync::Mutex as TokioMutex, sync::RwLock, sync::mpsc::Sender, task::JoinHandle}; @@ -494,13 +495,11 @@ async fn extension_loop_active( ); let metrics_aggr_init_start_time = Instant::now(); - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new( - SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS), - CONTEXTS, - ) - .expect("failed to create aggregator"), - )); + let (metrics_aggr_service, metrics_aggr_handle) = MetricsAggregatorService::new( + SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("can't create metrics service"); debug!( "Metrics aggregator created in {:} microseconds", metrics_aggr_init_start_time @@ -508,10 +507,11 @@ async fn extension_loop_active( .as_micros() .to_string() ); + start_dogstatsd_aggregator(metrics_aggr_service); let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( Arc::clone(&api_key_factory), - &metrics_aggr, + &metrics_aggr_handle, config, ))); // Lifecycle Invocation Processor @@ -519,7 +519,7 @@ async fn extension_loop_active( Arc::clone(&tags_provider), Arc::clone(config), Arc::clone(&aws_config), - Arc::clone(&metrics_aggr), + metrics_aggr_handle.clone(), ))); // AppSec processor (if enabled) let appsec_processor = match AppSecProcessor::new(config) { @@ -568,7 +568,7 @@ async fn extension_loop_active( } }); - let dogstatsd_cancel_token = start_dogstatsd(&metrics_aggr).await; + let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await; let telemetry_listener_cancel_token = setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; @@ -623,7 +623,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle.clone(), ) .await; } @@ -638,7 +638,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle.clone(), ) .await; let next_response = next_event(client, &r.extension_id).await; @@ -670,11 +670,15 @@ async fn extension_loop_active( })); let (metrics_flushers_copy, series, sketches) = { let locked_metrics = metrics_flushers.lock().await; - let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + let flush_response = metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); ( locked_metrics.clone(), - aggregator.consume_metrics(), - aggregator.consume_distributions(), + flush_response.series, + flush_response.distributions, ) }; for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { @@ -711,7 +715,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; last_continuous_flush_error = false; @@ -751,7 +755,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; } @@ -807,7 +811,7 @@ async fn extension_loop_active( &*stats_flusher, &proxy_flusher, &mut race_flush_interval, - &metrics_aggr, + &metrics_aggr_handle, ) .await; return Ok(()); @@ -822,18 +826,20 @@ async fn blocking_flush_all( stats_flusher: &impl StatsFlusher, proxy_flusher: &ProxyFlusher, race_flush_interval: &mut tokio::time::Interval, - metrics_aggr: &Arc>, + metrics_aggr_handle: &MetricsAggregatorHandle, ) { - let (series, sketches) = { - let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); - ( - aggregator.consume_metrics(), - aggregator.consume_distributions(), - ) - }; + let flush_response = metrics_aggr_handle + .flush() + .await + .expect("can't flush metrics aggr handle"); let metrics_futures: Vec<_> = metrics_flushers .iter_mut() - .map(|f| f.flush_metrics(series.clone(), sketches.clone())) + .map(|f| { + f.flush_metrics( + flush_response.series.clone(), + flush_response.distributions.clone(), + ) + }) .collect(); tokio::join!( @@ -1008,7 +1014,7 @@ fn start_logs_agent( fn start_metrics_flushers( api_key_factory: Arc, - metrics_aggr: &Arc>, + metrics_aggr_handle: &MetricsAggregatorHandle, config: &Arc, ) -> Vec { let mut flushers = Vec::new(); @@ -1031,7 +1037,7 @@ fn start_metrics_flushers( let flusher_config = MetricsFlusherConfig { api_key_factory, - aggregator: Arc::clone(metrics_aggr), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"), https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), @@ -1059,7 +1065,7 @@ fn start_metrics_flushers( let additional_api_key_factory = Arc::new(ApiKeyFactory::new(api_key)); let additional_flusher_config = MetricsFlusherConfig { api_key_factory: additional_api_key_factory, - aggregator: metrics_aggr.clone(), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: metrics_intake_url.clone(), https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), @@ -1157,7 +1163,13 @@ fn start_trace_agent( ) } -async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { +fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) { + tokio::spawn(async move { + aggr_service.run().await; + }); +} + +async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), port: DOGSTATSD_PORT, @@ -1165,7 +1177,7 @@ async fn start_dogstatsd(metrics_aggr: &Arc>) -> Cancel let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - Arc::clone(metrics_aggr), + metrics_aggr_handle, dogstatsd_cancel_token.clone(), ) .await; diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 7f4003739..ce5eaa52c 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -131,10 +131,12 @@ struct UniversalInstrumentationData { } impl Default for ContextBuffer { - /// Creates a new `ContextBuffer` with a default capacity of 5. + /// Creates a new `ContextBuffer` with a default capacity of 500 + /// This gives us enough capacity to process events which may be delayed due to async tasks + /// piling up preventing us from reading them quickly enough /// fn default() -> Self { - ContextBuffer::with_capacity(5) + ContextBuffer::with_capacity(500) } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ca87759bb..2bd152f17 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,13 +1,12 @@ use std::{ collections::{HashMap, VecDeque}, - sync::{Arc, Mutex}, + sync::Arc, time::{Instant, SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::tracer_header_tags; -use dogstatsd::aggregator::Aggregator as MetricsAggregator; use serde_json::{Value, json}; use tokio::sync::watch; use tracing::{debug, warn}; @@ -88,7 +87,7 @@ impl Processor { tags_provider: Arc, config: Arc, aws_config: Arc, - metrics_aggregator: Arc>, + metrics_aggregator: dogstatsd::aggregator_service::AggregatorHandle, ) -> Self { let resource = tags_provider .get_canonical_resource_name() @@ -974,7 +973,7 @@ mod tests { use super::*; use crate::LAMBDA_RUNTIME_SLUG; use base64::{Engine, engine::general_purpose::STANDARD}; - use dogstatsd::aggregator::Aggregator; + use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; fn setup() -> Processor { @@ -999,11 +998,12 @@ mod tests { &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), )); - let metrics_aggregator = Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), - )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); - Processor::new(tags_provider, config, aws_config, metrics_aggregator) + Processor::new(tags_provider, config, aws_config, handle) } #[test] @@ -1065,8 +1065,8 @@ mod tests { assert_eq!(error_tags["error.stack"], error_stack); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_with_sampling_priority() { + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_with_sampling_priority() { let mut p = setup(); let mut headers = HashMap::new(); @@ -1094,8 +1094,8 @@ mod tests { assert_eq!(priority, Some(-1.0)); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_with_invalid_priority() { + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_with_invalid_priority() { let mut p = setup(); let mut headers = HashMap::new(); @@ -1123,8 +1123,8 @@ mod tests { assert_eq!(context.invocation_span.parent_id, 999); } - #[test] - fn test_process_on_universal_instrumentation_end_headers_no_sampling_priority() { + #[tokio::test] + async fn test_process_on_universal_instrumentation_end_headers_no_sampling_priority() { let mut p = setup(); let mut headers = HashMap::new(); @@ -1148,8 +1148,8 @@ mod tests { assert_eq!(context.invocation_span.parent_id, 222); } - #[test] - fn test_process_on_invocation_end_tags_response_with_status_code() { + #[tokio::test] + async fn test_process_on_invocation_end_tags_response_with_status_code() { let mut p = setup(); let response = r#" @@ -1187,8 +1187,8 @@ mod tests { ); } - #[test] - fn test_on_shutdown_event_creates_unused_init_metrics() { + #[tokio::test] + async fn test_on_shutdown_event_creates_unused_init_metrics() { let mut processor = setup(); let now1 = i64::try_from(std::time::UNIX_EPOCH.elapsed().unwrap().as_secs()).unwrap(); @@ -1197,22 +1197,26 @@ mod tests { let now2 = i64::try_from(std::time::UNIX_EPOCH.elapsed().unwrap().as_secs()).unwrap(); let ts2 = (now2 / 10) * 10; - let aggregator = processor.enhanced_metrics.aggregator.lock().unwrap(); + let handle = &processor.enhanced_metrics.aggr_handle; assert!( - aggregator + handle .get_entry_by_id( crate::metrics::enhanced::constants::UNUSED_INIT.into(), - &None, + None, ts1 ) + .await + .unwrap() .is_some() - || aggregator + || handle .get_entry_by_id( crate::metrics::enhanced::constants::UNUSED_INIT.into(), - &None, + None, ts2 ) + .await + .unwrap() .is_some(), "UNUSED_INIT metric should be created when invoked_received=false" ); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 34879ce87..7e3276251 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -4,12 +4,12 @@ use crate::metrics::enhanced::{ }; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::{InitType, ReportMetrics, RuntimeDoneMetrics}; -use dogstatsd::metric; +use dogstatsd::metric::SortedTags; use dogstatsd::metric::{Metric, MetricValue}; -use dogstatsd::{aggregator::Aggregator, metric::SortedTags}; +use dogstatsd::{aggregator_service::AggregatorHandle, metric}; use std::collections::HashMap; use std::env::consts::ARCH; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use tokio::{ sync::watch::{Receiver, Sender}, @@ -19,7 +19,7 @@ use tracing::debug; use tracing::error; pub struct Lambda { - pub aggregator: Arc>, + pub aggr_handle: AggregatorHandle, pub config: Arc, // Dynamic value tags are the ones we cannot obtain statically from the sandbox dynamic_value_tags: HashMap, @@ -28,9 +28,9 @@ pub struct Lambda { impl Lambda { #[must_use] - pub fn new(aggregator: Arc>, config: Arc) -> Lambda { + pub fn new(aggregator: AggregatorHandle, config: Arc) -> Lambda { Lambda { - aggregator, + aggr_handle: aggregator, config, dynamic_value_tags: HashMap::new(), invoked_received: false, @@ -112,12 +112,7 @@ impl Lambda { Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert metric: {}", e); } } @@ -136,12 +131,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert metric: {}", e); } } @@ -157,12 +147,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert runtime duration metric: {}", e); } @@ -174,12 +159,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert produced bytes metric: {}", e); } } @@ -209,12 +189,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = self - .aggregator - .lock() - .expect("lock poisoned") - .insert(metric) - { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert post runtime duration metric: {}", e); } } @@ -222,7 +197,7 @@ impl Lambda { pub fn generate_network_enhanced_metrics( network_data_offset: NetworkData, network_data_end: NetworkData, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -241,7 +216,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert rx_bytes metric: {}", e); } @@ -251,7 +226,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tx_bytes metric: {}", e); } @@ -261,7 +236,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert total_network metric: {}", e); } } @@ -272,15 +247,14 @@ impl Lambda { } if let Some(offset) = network_offset { - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); match proc::get_network_data() { Ok(data) => { Self::generate_network_enhanced_metrics( offset, data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -296,7 +270,7 @@ impl Lambda { pub(crate) fn generate_cpu_time_enhanced_metrics( cpu_data_offset: &CPUData, cpu_data_end: &CPUData, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let cpu_user_time = cpu_data_end.total_user_time_ms - cpu_data_offset.total_user_time_ms; @@ -315,7 +289,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_user_time metric: {}", e); } @@ -325,7 +299,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_system_time metric: {}", e); } @@ -335,7 +309,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert cpu_total_time metric: {}", e); } } @@ -345,8 +319,7 @@ impl Lambda { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); let cpu_data = proc::get_cpu_data(); match (cpu_offset, cpu_data) { @@ -354,7 +327,7 @@ impl Lambda { Self::generate_cpu_time_enhanced_metrics( &cpu_offset, &cpu_data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -369,7 +342,7 @@ impl Lambda { cpu_data_end: &CPUData, uptime_data_offset: f64, uptime_data_end: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let num_cores = cpu_data_end.individual_cpu_idle_times.len() as f64; @@ -414,54 +387,41 @@ impl Lambda { // Multiply by num_cores to report in terms of cores let cpu_total_utilization = cpu_total_utilization_decimal * num_cores; - let metric = Metric::new( - constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), - MetricValue::distribution(cpu_total_utilization_pct), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert(metric) { - error!("Failed to insert cpu_total_utilization_pct metric: {}", e); - } - - let metric = Metric::new( - constants::CPU_TOTAL_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_total_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert(metric) { - error!("Failed to insert cpu_total_utilization metric: {}", e); - } - - let metric = Metric::new( - constants::NUM_CORES_METRIC.into(), - MetricValue::distribution(num_cores), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert(metric) { - error!("Failed to insert num_cores metric: {}", e); - } - - let metric = Metric::new( - constants::CPU_MAX_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_max_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert(metric) { - error!("Failed to insert cpu_max_utilization metric: {}", e); - } + let metrics = vec![ + Metric::new( + constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), + MetricValue::distribution(cpu_total_utilization_pct), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_TOTAL_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_total_utilization), + tags.clone(), + Some(now), + ), + Metric::new( + constants::NUM_CORES_METRIC.into(), + MetricValue::distribution(num_cores), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_MAX_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_max_utilization), + tags.clone(), + Some(now), + ), + Metric::new( + constants::CPU_MIN_UTILIZATION_METRIC.into(), + MetricValue::distribution(cpu_min_utilization), + tags, + Some(now), + ), + ]; - let metric = Metric::new( - constants::CPU_MIN_UTILIZATION_METRIC.into(), - MetricValue::distribution(cpu_min_utilization), - tags.clone(), - Some(now), - ); - if let Err(e) = aggr.insert(metric) { - error!("Failed to insert cpu_min_utilization metric: {}", e); + if let Err(e) = aggr.insert_batch(metrics) { + error!("Failed to insert cpu utilization metrics: {}", e); } } @@ -474,8 +434,7 @@ impl Lambda { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); + let aggr_handle = self.aggr_handle.clone(); let cpu_data = proc::get_cpu_data(); let uptime_data = proc::get_uptime(); @@ -486,7 +445,7 @@ impl Lambda { &cpu_data, uptime_offset, uptime_data, - &mut aggr, + &aggr_handle, self.get_dynamic_value_tags(), ); } @@ -499,7 +458,7 @@ impl Lambda { pub fn generate_tmp_enhanced_metrics( tmp_max: f64, tmp_used: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -514,7 +473,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_max metric: {}", e); } @@ -524,7 +483,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_used metric: {}", e); } @@ -535,7 +494,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert tmp_free metric: {}", e); } } @@ -545,7 +504,7 @@ impl Lambda { return; } - let aggr = Arc::clone(&self.aggregator); + let aggr = self.aggr_handle.clone(); let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { @@ -566,9 +525,7 @@ impl Lambda { biased; // When the stop signal is received, generate final metrics _ = send_metrics.changed() => { - let mut aggr: std::sync::MutexGuard = - aggr.lock().expect("lock poisoned"); - Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr, tags); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &aggr, tags); return; } // Otherwise keep monitoring tmp usage periodically @@ -592,7 +549,7 @@ impl Lambda { fd_use: f64, threads_max: f64, threads_use: f64, - aggr: &mut std::sync::MutexGuard, + aggr: &AggregatorHandle, tags: Option, ) { let now = std::time::UNIX_EPOCH @@ -607,7 +564,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert fd_max metric: {}", e); } @@ -619,7 +576,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert fd_use metric: {}", e); } } else { @@ -632,7 +589,7 @@ impl Lambda { tags.clone(), Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert threads_max metric: {}", e); } @@ -644,7 +601,7 @@ impl Lambda { tags, Some(now), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = aggr.insert_batch(vec![metric]) { error!("Failed to insert threads_use metric: {}", e); } } else { @@ -657,7 +614,7 @@ impl Lambda { return; } - let aggr = Arc::clone(&self.aggregator); + let aggr = self.aggr_handle.clone(); let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { @@ -678,9 +635,7 @@ impl Lambda { biased; // When the stop signal is received, generate final metrics _ = send_metrics.changed() => { - let mut aggr: std::sync::MutexGuard = - aggr.lock().expect("lock poisoned"); - Self::generate_process_metrics(fd_max, fd_use, threads_max, threads_use, &mut aggr, tags.clone()); + Self::generate_process_metrics(fd_max, fd_use, threads_max, threads_use, &aggr, tags.clone()); return; } // Otherwise keep monitoring file descriptor and thread usage periodically @@ -717,15 +672,13 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - let mut aggr: std::sync::MutexGuard = - self.aggregator.lock().expect("lock poisoned"); let metric = metric::Metric::new( constants::DURATION_METRIC.into(), MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert duration metric: {}", e); } let metric = metric::Metric::new( @@ -734,7 +687,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert billed duration metric: {}", e); } let metric = metric::Metric::new( @@ -743,7 +696,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert max memory used metric: {}", e); } let metric = metric::Metric::new( @@ -752,7 +705,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert memory size metric: {}", e); } @@ -764,7 +717,7 @@ impl Lambda { self.get_dynamic_value_tags(), Some(timestamp), ); - if let Err(e) = aggr.insert(metric) { + if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { error!("failed to insert estimated cost metric: {}", e); } } @@ -794,33 +747,32 @@ mod tests { use super::*; use crate::config; + use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::metric::EMPTY_TAGS; const PRECISION: f64 = 0.000_000_01; - fn setup() -> (Arc>, Arc) { + fn setup() -> (AggregatorHandle, Arc) { let config = Arc::new(config::Config { service: Some("test-service".to_string()), tags: HashMap::from([("test".to_string(), "tags".to_string())]), ..config::Config::default() }); - ( - Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), - )), - config, - ) + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); + + (handle, config) } - fn assert_sketch( - aggregator_mutex: &Mutex, - metric_id: &str, - value: f64, - timestamp: i64, - ) { + async fn assert_sketch(handle: &AggregatorHandle, metric_id: &str, value: f64, timestamp: i64) { let ts = (timestamp / 10) * 10; - let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, ts) { + if let Some(e) = handle + .get_entry_by_id(metric_id.into(), None, ts) + .await + .unwrap() + { let metric = e.value.get_sketch().unwrap(); assert!((metric.max().unwrap() - value).abs() < PRECISION); assert!((metric.min().unwrap() - value).abs() < PRECISION); @@ -831,9 +783,9 @@ mod tests { } } - #[test] + #[tokio::test] #[allow(clippy::float_cmp)] - fn test_increment_invocation_metric() { + async fn test_increment_invocation_metric() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH @@ -849,12 +801,12 @@ mod tests { .as_secs() .try_into() .unwrap_or_default(); - assert_sketch(&metrics_aggr, constants::INVOCATIONS_METRIC, 1f64, now); + assert_sketch(&metrics_aggr, constants::INVOCATIONS_METRIC, 1f64, now).await; } - #[test] + #[tokio::test] #[allow(clippy::float_cmp)] - fn test_increment_errors_metric() { + async fn test_increment_errors_metric() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH @@ -870,12 +822,12 @@ mod tests { .as_secs() .try_into() .unwrap_or_default(); - assert_sketch(&metrics_aggr, constants::ERRORS_METRIC, 1f64, now); + assert_sketch(&metrics_aggr, constants::ERRORS_METRIC, 1f64, now).await; } - #[test] + #[tokio::test] #[allow(clippy::too_many_lines)] - fn test_disabled() { + async fn test_disabled() { let (metrics_aggr, no_config) = setup(); let my_config = Arc::new(config::Config { enhanced_metrics: false, @@ -911,135 +863,224 @@ mod tests { }, now, ); - let aggr = metrics_aggr.lock().expect("lock poisoned"); assert!( - aggr.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::INVOCATIONS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::ERRORS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::ERRORS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TIMEOUTS_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TIMEOUTS_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::INIT_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::INIT_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::RUNTIME_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::PRODUCED_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::POST_RUNTIME_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::BILLED_DURATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::BILLED_DURATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::MAX_MEMORY_USED_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::MAX_MEMORY_USED_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::MEMORY_SIZE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::MEMORY_SIZE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::ESTIMATED_COST_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::ESTIMATED_COST_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::RX_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::RX_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TX_BYTES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TX_BYTES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TOTAL_NETWORK_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TOTAL_NETWORK_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_USER_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_USER_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_SYSTEM_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_SYSTEM_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_TOTAL_TIME_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_TOTAL_TIME_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id( - constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), - &None, - now - ) - .is_none() + metrics_aggr + .get_entry_by_id( + constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), + None, + now + ) + .await + .unwrap() + .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_TOTAL_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_TOTAL_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::NUM_CORES_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::NUM_CORES_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_MIN_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_MIN_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_USED_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_USED_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::TMP_FREE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::FD_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::FD_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_MAX_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_MAX_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); } - #[test] - fn test_set_runtime_done_metrics() { + #[tokio::test] + async fn test_set_runtime_done_metrics() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let runtime_done_metrics = RuntimeDoneMetrics { @@ -1059,12 +1100,13 @@ mod tests { constants::RUNTIME_DURATION_METRIC, 100.0, now, - ); - assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now); + ) + .await; + assert_sketch(&metrics_aggr, constants::PRODUCED_BYTES_METRIC, 42.0, now).await; } - #[test] - fn test_set_report_log_metrics() { + #[tokio::test] + async fn test_set_report_log_metrics() { let (metrics_aggr, my_config) = setup(); let lambda = Lambda::new(metrics_aggr.clone(), my_config); let report_metrics = ReportMetrics { @@ -1083,17 +1125,17 @@ mod tests { .unwrap_or_default(); lambda.set_report_log_metrics(&report_metrics, now); - assert_sketch(&metrics_aggr, constants::DURATION_METRIC, 0.1, now); - assert_sketch(&metrics_aggr, constants::BILLED_DURATION_METRIC, 0.1, now); + assert_sketch(&metrics_aggr, constants::DURATION_METRIC, 0.1, now).await; + assert_sketch(&metrics_aggr, constants::BILLED_DURATION_METRIC, 0.1, now).await; - assert_sketch(&metrics_aggr, constants::MAX_MEMORY_USED_METRIC, 128.0, now); - assert_sketch(&metrics_aggr, constants::MEMORY_SIZE_METRIC, 256.0, now); + assert_sketch(&metrics_aggr, constants::MAX_MEMORY_USED_METRIC, 128.0, now).await; + assert_sketch(&metrics_aggr, constants::MEMORY_SIZE_METRIC, 256.0, now).await; } - #[test] - fn test_set_network_enhanced_metrics() { + #[tokio::test] + async fn test_set_network_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1112,19 +1154,19 @@ mod tests { Lambda::generate_network_enhanced_metrics( network_offset, network_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0, now); - assert_sketch(&metrics_aggr, constants::TX_BYTES_METRIC, 74746.0, now); - assert_sketch(&metrics_aggr, constants::TOTAL_NETWORK_METRIC, 94746.0, now); + assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0, now).await; + assert_sketch(&metrics_aggr, constants::TX_BYTES_METRIC, 74746.0, now).await; + assert_sketch(&metrics_aggr, constants::TOTAL_NETWORK_METRIC, 94746.0, now).await; } - #[test] - fn test_set_cpu_time_enhanced_metrics() { + #[tokio::test] + async fn test_set_cpu_time_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1151,22 +1193,17 @@ mod tests { individual_cpu_idle_times: individual_cpu_idle_times_end, }; - Lambda::generate_cpu_time_enhanced_metrics( - &cpu_offset, - &cpu_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), - None, - ); + Lambda::generate_cpu_time_enhanced_metrics(&cpu_offset, &cpu_data, &metrics_aggr, None); - assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0, now); - assert_sketch(&metrics_aggr, constants::CPU_SYSTEM_TIME_METRIC, 53.0, now); - assert_sketch(&metrics_aggr, constants::CPU_TOTAL_TIME_METRIC, 153.0, now); + assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0, now).await; + assert_sketch(&metrics_aggr, constants::CPU_SYSTEM_TIME_METRIC, 53.0, now).await; + assert_sketch(&metrics_aggr, constants::CPU_TOTAL_TIME_METRIC, 153.0, now).await; } - #[test] - fn test_set_cpu_utilization_enhanced_metrics() { + #[tokio::test] + async fn test_set_cpu_utilization_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1200,7 +1237,7 @@ mod tests { &cpu_data, uptime_offset, uptime_data, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); @@ -1210,32 +1247,36 @@ mod tests { constants::CPU_TOTAL_UTILIZATION_PCT_METRIC, 30.0, now, - ); + ) + .await; assert_sketch( &metrics_aggr, constants::CPU_TOTAL_UTILIZATION_METRIC, 0.6, now, - ); - assert_sketch(&metrics_aggr, constants::NUM_CORES_METRIC, 2.0, now); + ) + .await; + assert_sketch(&metrics_aggr, constants::NUM_CORES_METRIC, 2.0, now).await; assert_sketch( &metrics_aggr, constants::CPU_MAX_UTILIZATION_METRIC, 30.0, now, - ); + ) + .await; assert_sketch( &metrics_aggr, constants::CPU_MIN_UTILIZATION_METRIC, 28.75, now, - ); + ) + .await; } - #[test] - fn test_set_tmp_enhanced_metrics() { + #[tokio::test] + async fn test_set_tmp_enhanced_metrics() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1245,27 +1286,23 @@ mod tests { let tmp_max = 550_461_440.0; let tmp_used = 12_165_120.0; - Lambda::generate_tmp_enhanced_metrics( - tmp_max, - tmp_used, - &mut lambda.aggregator.lock().expect("lock poisoned"), - None, - ); + Lambda::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &metrics_aggr, None); - assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550_461_440.0, now); - assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12_165_120.0, now); + assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550_461_440.0, now).await; + assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12_165_120.0, now).await; assert_sketch( &metrics_aggr, constants::TMP_FREE_METRIC, 538_296_320.0, now, - ); + ) + .await; } - #[test] - fn test_set_process_enhanced_metrics_valid_use() { + #[tokio::test] + async fn test_set_process_enhanced_metrics_valid_use() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1282,20 +1319,20 @@ mod tests { fd_use, threads_max, threads_use, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0, now); + assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0, now).await; } - #[test] - fn test_set_process_enhanced_metrics_invalid_use() { + #[tokio::test] + async fn test_set_process_enhanced_metrics_invalid_use() { let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let _lambda = Lambda::new(metrics_aggr.clone(), my_config); let now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -1312,20 +1349,25 @@ mod tests { fd_use, threads_max, threads_use, - &mut lambda.aggregator.lock().expect("lock poisoned"), + &metrics_aggr, None, ); - assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now); - assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now); + assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0, now).await; + assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0, now).await; - let aggr = lambda.aggregator.lock().expect("lock poisoned"); assert!( - aggr.get_entry_by_id(constants::FD_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::FD_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); assert!( - aggr.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None, now) + metrics_aggr + .get_entry_by_id(constants::THREADS_USE_METRIC.into(), None, now) + .await + .unwrap() .is_none() ); } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 0767bf25c..35b9d3dc4 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -430,12 +430,11 @@ mod tests { use http_body_util::BodyExt; use std::{ collections::HashMap, - sync::Mutex, time::{Duration, Instant}, }; use tokio::sync::Mutex as TokioMutex; - use dogstatsd::{aggregator::Aggregator as MetricsAggregator, metric::EMPTY_TAGS}; + use dogstatsd::{aggregator_service::AggregatorService, metric::EMPTY_TAGS}; use http_body_util::Full; use hyper::{server::conn::http1, service::service_fn}; use hyper_util::rt::TokioIo; @@ -482,9 +481,12 @@ mod tests { LAMBDA_RUNTIME_SLUG.to_string(), &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), )); - let metrics_aggregator = Arc::new(Mutex::new( - MetricsAggregator::new(EMPTY_TAGS, 1024).expect("failed to create metrics aggregator"), - )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + + tokio::spawn(service.run()); + + let metrics_aggregator = handle; let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), function_name: "arn:some-function".to_string(), diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 83935a877..8f9ec8553 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -1,13 +1,13 @@ use bottlecap::config::Config; use bottlecap::metrics::enhanced::lambda::Lambda as enhanced_metrics; -use dogstatsd::aggregator::Aggregator as MetricsAggregator; +use dogstatsd::aggregator_service::AggregatorService; use dogstatsd::api_key::ApiKeyFactory; use dogstatsd::datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}; use dogstatsd::flusher::Flusher as MetricsFlusher; use dogstatsd::flusher::FlusherConfig as MetricsFlusherConfig; use dogstatsd::metric::SortedTags; use httpmock::prelude::*; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; mod common; @@ -35,10 +35,12 @@ async fn test_enhanced_metrics() { let arc_config = Arc::new(Config::default()); - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(SortedTags::parse("aTagKey:aTagValue").unwrap(), 1024) - .expect("failed to create aggregator"), - )); + let (metrics_aggr_service, metrics_aggr_handle) = + AggregatorService::new(SortedTags::parse("aTagKey:aTagValue").unwrap(), 1024) + .expect("failed to create aggregator service"); + + tokio::spawn(metrics_aggr_service.run()); + let metrics_site_override = MetricsIntakeUrlPrefixOverride::maybe_new( None, Some(DdDdUrl::new(server.base_url()).expect("failed to create dd url")), @@ -46,7 +48,7 @@ async fn test_enhanced_metrics() { .expect("failed to create metrics override"); let flusher_config = MetricsFlusherConfig { api_key_factory: Arc::new(ApiKeyFactory::new(dd_api_key)), - aggregator: metrics_aggr.clone(), + aggregator_handle: metrics_aggr_handle.clone(), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(None, Some(metrics_site_override)) .expect("can't parse metrics intake URL from site"), https_proxy: None, @@ -55,7 +57,7 @@ async fn test_enhanced_metrics() { }; let mut metrics_flusher = MetricsFlusher::new(flusher_config); let lambda_enhanced_metrics = - enhanced_metrics::new(Arc::clone(&metrics_aggr), Arc::clone(&arc_config)); + enhanced_metrics::new(metrics_aggr_handle.clone(), Arc::clone(&arc_config)); let now = std::time::UNIX_EPOCH .elapsed()