diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 043548ae..86090129 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -7,7 +7,7 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] -use std::{env, sync::Arc}; +use std::{env, sync::Arc, sync::Mutex}; use tokio::{ sync::Mutex as TokioMutex, time::{interval, Duration}, @@ -25,7 +25,7 @@ use datadog_trace_agent::{ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use dogstatsd::{ - aggregator_service::{AggregatorHandle, AggregatorService}, + aggregator::Aggregator as MetricsAggregator, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site}, @@ -135,9 +135,9 @@ pub async fn main() { } }); - let (mut metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { + let mut metrics_flusher = if dd_use_dogstatsd { debug!("Starting dogstatsd"); - let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( + let (_, metrics_flusher) = start_dogstatsd( dd_dogstatsd_port, dd_api_key, dd_site, @@ -146,10 +146,10 @@ pub async fn main() { ) .await; info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); - (metrics_flusher, Some(aggregator_handle)) + metrics_flusher } else { info!("dogstatsd disabled"); - (None, None) + None }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); @@ -171,28 +171,24 @@ async fn start_dogstatsd( dd_site: String, https_proxy: Option, dogstatsd_tags: &str, -) -> (CancellationToken, Option, AggregatorHandle) { - // 1. Create the aggregator service +) -> (CancellationToken, Option) { #[allow(clippy::expect_used)] - let (service, handle) = AggregatorService::new( - SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), - CONTEXTS, - ) - .expect("Failed to create aggregator service"); - - // 2. Start the aggregator service in the background - tokio::spawn(service.run()); + let metrics_aggr = Arc::new(Mutex::new( + MetricsAggregator::new( + SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("Failed to create metrics aggregator"), + )); let dogstatsd_config = DogStatsDConfig { host: AGENT_HOST.to_string(), port, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - // 3. Use handle in DogStatsD (cheap to clone) let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - handle.clone(), + Arc::clone(&metrics_aggr), dogstatsd_cancel_token.clone(), ) .await; @@ -206,7 +202,7 @@ async fn start_dogstatsd( #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), - aggregator_handle: handle.clone(), + aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), None, @@ -224,5 +220,5 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher, handle) + (dogstatsd_cancel_token, metrics_flusher) } diff --git a/crates/dogstatsd/src/aggregator_service.rs b/crates/dogstatsd/src/aggregator_service.rs deleted file mode 100644 index 04222529..00000000 --- a/crates/dogstatsd/src/aggregator_service.rs +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use crate::aggregator::Aggregator; -use crate::datadog::Series; -use crate::metric::{Metric, SortedTags}; -use datadog_protos::metrics::SketchPayload; -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, warn}; - -#[derive(Debug)] -pub enum AggregatorCommand { - InsertBatch(Vec), - Flush(oneshot::Sender), - Shutdown, -} - -#[derive(Debug)] -pub struct FlushResponse { - pub series: Vec, - pub distributions: Vec, -} - -#[derive(Clone)] -pub struct AggregatorHandle { - tx: mpsc::UnboundedSender, -} - -impl AggregatorHandle { - pub fn insert_batch( - &self, - metrics: Vec, - ) -> Result<(), mpsc::error::SendError> { - self.tx.send(AggregatorCommand::InsertBatch(metrics)) - } - - pub async fn flush(&self) -> Result { - let (response_tx, response_rx) = oneshot::channel(); - self.tx - .send(AggregatorCommand::Flush(response_tx)) - .map_err(|e| format!("Failed to send flush command: {}", e))?; - - response_rx - .await - .map_err(|e| format!("Failed to receive flush response: {}", e)) - } - - pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { - self.tx.send(AggregatorCommand::Shutdown) - } -} - -pub struct AggregatorService { - aggregator: Aggregator, - rx: mpsc::UnboundedReceiver, -} - -impl AggregatorService { - pub fn new( - tags: SortedTags, - max_context: usize, - ) -> Result<(Self, AggregatorHandle), crate::errors::Creation> { - let (tx, rx) = mpsc::unbounded_channel(); - let aggregator = Aggregator::new(tags, max_context)?; - - let service = Self { aggregator, rx }; - - let handle = AggregatorHandle { tx }; - - Ok((service, handle)) - } - - pub async fn run(mut self) { - debug!("Aggregator service started"); - - while let Some(command) = self.rx.recv().await { - match command { - AggregatorCommand::InsertBatch(metrics) => { - let mut insert_errors = 0; - for metric in metrics { - // The only possible error here is an overflow - if let Err(_e) = self.aggregator.insert(metric) { - insert_errors += 1; - } - } - if insert_errors > 0 { - warn!("Total of {} metrics failed to insert", insert_errors); - } - } - - AggregatorCommand::Flush(response_tx) => { - let series = self.aggregator.consume_metrics(); - let distributions = self.aggregator.consume_distributions(); - - let response = FlushResponse { - series, - distributions, - }; - - if let Err(_) = response_tx.send(response) { - error!("Failed to send flush response - receiver dropped"); - } - } - - AggregatorCommand::Shutdown => { - debug!("Aggregator service shutting down"); - break; - } - } - } - - debug!("Aggregator service stopped"); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::metric::{parse, EMPTY_TAGS}; - - #[tokio::test] - async fn test_aggregator_service_basic_flow() { - let (service, handle) = - AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service"); - - // Start the service in a background task - let service_task = tokio::spawn(service.run()); - - // Insert some metrics - let metrics = vec![ - parse("test:1|c|#k:v").expect("metric parse failed"), - parse("foo:2|c|#k:v").expect("metric parse failed"), - ]; - - handle - .insert_batch(metrics) - .expect("Failed to insert metrics"); - - // Flush and check results - let response = handle.flush().await.expect("Failed to flush"); - assert_eq!(response.series.len(), 1); - assert_eq!(response.series[0].series.len(), 2); - - // Shutdown the service - handle.shutdown().expect("Failed to shutdown"); - service_task.await.expect("Service task failed"); - } - - #[tokio::test] - async fn test_aggregator_service_distributions() { - let (service, handle) = - AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service"); - - // Start the service in a background task - let service_task = tokio::spawn(service.run()); - - // Insert distribution metrics - let metrics = vec![ - parse("dist1:100|d|#k:v").expect("metric parse failed"), - parse("dist2:200|d|#k:v").expect("metric parse failed"), - ]; - - handle - .insert_batch(metrics) - .expect("Failed to insert metrics"); - - // Flush and check results - let response = handle.flush().await.expect("Failed to flush"); - assert_eq!(response.distributions.len(), 1); - assert_eq!(response.distributions[0].sketches.len(), 2); - assert_eq!(response.series.len(), 0); - - // Shutdown the service - handle.shutdown().expect("Failed to shutdown"); - service_task.await.expect("Service task failed"); - } -} diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index f16ff0b1..3e129b26 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -3,15 +3,16 @@ use std::net::SocketAddr; use std::str::Split; +use std::sync::{Arc, Mutex}; -use crate::aggregator_service::AggregatorHandle; +use crate::aggregator::Aggregator; use crate::errors::ParseError::UnsupportedType; use crate::metric::{parse, Metric}; use tracing::{debug, error}; pub struct DogStatsD { cancel_token: tokio_util::sync::CancellationToken, - aggregator_handle: AggregatorHandle, + aggregator: Arc>, buffer_reader: BufferReader, } @@ -51,7 +52,7 @@ impl DogStatsD { #[must_use] pub async fn new( config: &DogStatsDConfig, - aggregator_handle: AggregatorHandle, + aggregator: Arc>, cancel_token: tokio_util::sync::CancellationToken, ) -> DogStatsD { let addr = format!("{}:{}", config.host, config.port); @@ -63,7 +64,7 @@ impl DogStatsD { .expect("couldn't bind to address"); DogStatsD { cancel_token, - aggregator_handle, + aggregator, buffer_reader: BufferReader::UdpSocketReader(socket), } } @@ -118,9 +119,10 @@ impl DogStatsD { }) .collect(); if !all_valid_metrics.is_empty() { - // Send metrics through the channel - no lock needed! - if let Err(e) = self.aggregator_handle.insert_batch(all_valid_metrics) { - error!("Failed to send metrics to aggregator: {}", e); + #[allow(clippy::expect_used)] + let mut guarded_aggregator = self.aggregator.lock().expect("lock poisoned"); + for a_valid_value in all_valid_metrics { + let _ = guarded_aggregator.insert(a_valid_value); } } } @@ -129,38 +131,62 @@ impl DogStatsD { #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { - use crate::aggregator_service::AggregatorService; + use crate::aggregator::tests::assert_sketch; + use crate::aggregator::tests::assert_value; + use crate::aggregator::Aggregator; use crate::dogstatsd::{BufferReader, DogStatsD}; use crate::metric::EMPTY_TAGS; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::{Arc, Mutex}; use tracing_test::traced_test; #[tokio::test] async fn test_dogstatsd_multi_distribution() { - let response = setup_and_consume_dogstatsd( + let locked_aggregator = setup_dogstatsd( "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 ", ) .await; - - assert_eq!(response.distributions.len(), 1); - assert_eq!(response.distributions[0].sketches.len(), 3); - assert_eq!(response.series.len(), 0); + let aggregator = locked_aggregator.lock().expect("lock poisoned"); + + let parsed_metrics = aggregator.distributions_to_protobuf(); + + assert_eq!(parsed_metrics.sketches.len(), 3); + assert_eq!(aggregator.to_series().len(), 0); + drop(aggregator); + + assert_sketch( + &locked_aggregator, + "single_machine_performance.rouster.api.series_v2.payload_size_bytes", + 269_942_f64, + 1656581400, + ); + assert_sketch( + &locked_aggregator, + "single_machine_performance.rouster.metrics_min_timestamp_latency", + 1_426.908_702_16, + 1656581400, + ); + assert_sketch( + &locked_aggregator, + "single_machine_performance.rouster.metrics_max_timestamp_latency", + 1_376.908_702_16, + 1656581400, + ); } #[tokio::test] async fn test_dogstatsd_multi_metric() { - let mut now: i64 = std::time::UNIX_EPOCH + let mut now = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") .as_secs() .try_into() .unwrap_or_default(); now = (now / 10) * 10; - - let response = setup_and_consume_dogstatsd( + let locked_aggregator = setup_dogstatsd( format!( "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", now @@ -168,56 +194,69 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d .as_str(), ) .await; - - assert_eq!(response.series.len(), 1); - assert_eq!(response.series[0].series.len(), 3); - assert_eq!(response.distributions.len(), 0); + let aggregator = locked_aggregator.lock().expect("lock poisoned"); + + let parsed_metrics = aggregator.to_series(); + + assert_eq!(parsed_metrics.len(), 3); + assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); + drop(aggregator); + + assert_value(&locked_aggregator, "metric1", 1.0, "", now); + assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now); + assert_value( + &locked_aggregator, + "metric3", + 3.0, + "tag3:val3,tag4:val4", + now, + ); } #[tokio::test] async fn test_dogstatsd_single_metric() { - let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409").await; + let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await; + let aggregator = locked_aggregator.lock().expect("lock poisoned"); + let parsed_metrics = aggregator.to_series(); + + assert_eq!(parsed_metrics.len(), 1); + assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); + drop(aggregator); - assert_eq!(response.series.len(), 1); - assert_eq!(response.series[0].series.len(), 1); - assert_eq!(response.distributions.len(), 0); + assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400); } #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_service_check() { - let response = setup_and_consume_dogstatsd("_sc|servicecheck|0").await; + let locked_aggregator = setup_dogstatsd("_sc|servicecheck|0").await; + let aggregator = locked_aggregator.lock().expect("lock poisoned"); + let parsed_metrics = aggregator.to_series(); assert!(!logs_contain("Failed to parse metric")); - assert_eq!(response.series.len(), 0); - assert_eq!(response.distributions.len(), 0); + assert_eq!(parsed_metrics.len(), 0); } #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_event() { - let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event").await; + let locked_aggregator = setup_dogstatsd("_e{5,10}:event|test event").await; + let aggregator = locked_aggregator.lock().expect("lock poisoned"); + let parsed_metrics = aggregator.to_series(); assert!(!logs_contain("Failed to parse metric")); - assert_eq!(response.series.len(), 0); - assert_eq!(response.distributions.len(), 0); + assert_eq!(parsed_metrics.len(), 0); } - async fn setup_and_consume_dogstatsd( - statsd_string: &str, - ) -> crate::aggregator_service::FlushResponse { - // Create the aggregator service - let (service, handle) = - AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); - - // Start the service in a background task - let service_task = tokio::spawn(service.run()); - + async fn setup_dogstatsd(statsd_string: &str) -> Arc> { + let aggregator_arc = Arc::new(Mutex::new( + Aggregator::new(EMPTY_TAGS, 1_024).expect("aggregator creation failed"), + )); let cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd = DogStatsD { cancel_token, - aggregator_handle: handle.clone(), + aggregator: Arc::clone(&aggregator_arc), buffer_reader: BufferReader::MirrorReader( statsd_string.as_bytes().to_vec(), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), @@ -225,13 +264,6 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d }; dogstatsd.consume_statsd().await; - // Get the metrics via flush - let response = handle.flush().await.expect("Failed to flush"); - - // Shutdown the service - handle.shutdown().expect("Failed to shutdown"); - service_task.await.expect("Service task failed"); - - response + aggregator_arc } } diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index c114405e..a66d8434 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -1,11 +1,11 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::aggregator_service::AggregatorHandle; +use crate::aggregator::Aggregator; use crate::api_key::ApiKeyFactory; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::OnceCell; use tracing::{debug, error}; @@ -18,19 +18,20 @@ pub struct Flusher { https_proxy: Option, timeout: Duration, retry_strategy: RetryStrategy, - aggregator_handle: AggregatorHandle, + aggregator: Arc>, dd_api: OnceCell>, } pub struct FlusherConfig { pub api_key_factory: Arc, - pub aggregator_handle: AggregatorHandle, + pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, } +#[allow(clippy::await_holding_lock)] impl Flusher { pub fn new(config: FlusherConfig) -> Self { Flusher { @@ -39,7 +40,7 @@ impl Flusher { https_proxy: config.https_proxy, timeout: config.timeout, retry_strategy: config.retry_strategy, - aggregator_handle: config.aggregator_handle, + aggregator: config.aggregator, dd_api: OnceCell::new(), } } @@ -72,17 +73,15 @@ impl Flusher { Vec, Vec, )> { - // Request flush through the channel - no lock needed! - let response = match self.aggregator_handle.flush().await { - Ok(response) => response, - Err(e) => { - error!("Failed to flush aggregator: {}", e); - return Some((Vec::new(), Vec::new())); - } + let (series, distributions) = { + #[allow(clippy::expect_used)] + let mut aggregator = self.aggregator.lock().expect("lock poisoned"); + ( + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) }; - - self.flush_metrics(response.series, response.distributions) - .await + self.flush_metrics(series, distributions).await } /// Flush given batch of metrics diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index e30bb861..59cbd5b9 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -8,7 +8,6 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod aggregator; -pub mod aggregator_service; pub mod api_key; pub mod constants; pub mod datadog; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 4d07f455..10fbc783 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -4,7 +4,6 @@ use dogstatsd::metric::SortedTags; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, - aggregator_service::{AggregatorHandle, AggregatorService}, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, @@ -12,7 +11,7 @@ use dogstatsd::{ flusher::{Flusher, FlusherConfig}, }; use mockito::Server; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::{ net::UdpSocket, time::{sleep, timeout, Duration}, @@ -34,21 +33,18 @@ async fn dogstatsd_server_ships_series() { .create_async() .await; - // Create the aggregator service - let (service, handle) = - AggregatorService::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) - .expect("failed to create aggregator service"); + let metrics_aggr = Arc::new(Mutex::new( + MetricsAggregator::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) + .expect("failed to create aggregator"), + )); - // Start the service in a background task - tokio::spawn(service.run()); - - let _ = start_dogstatsd(handle.clone()).await; + let _ = start_dogstatsd(&metrics_aggr).await; let api_key_factory = ApiKeyFactory::new("mock-api-key"); let mut metrics_flusher = Flusher::new(FlusherConfig { api_key_factory: Arc::new(api_key_factory), - aggregator_handle: handle.clone(), + aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None, MetricsIntakeUrlPrefixOverride::maybe_new( @@ -88,7 +84,7 @@ async fn dogstatsd_server_ships_series() { } } -async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationToken { +async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: "127.0.0.1".to_string(), port: 18125, @@ -96,7 +92,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - aggregator_handle, + Arc::clone(metrics_aggr), dogstatsd_cancel_token.clone(), ) .await;