From 9708582ddb20d574f9f6882a8a764646c704d7c4 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Tue, 28 Oct 2025 11:09:07 -0400 Subject: [PATCH] perf(dogstatsd): optimize flush_metrics to skip processing when empty --- crates/dogstatsd/src/api_key.rs | 4 +- crates/dogstatsd/src/flusher.rs | 172 ++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 2 deletions(-) diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index f724b42f..86df3e6e 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -66,9 +66,9 @@ impl ApiKeyFactory { } => { if self.should_load_api_key().await { // Try to acquire the loading lock. - if (loading_api_key + if loading_api_key .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok()) + .is_ok() { // Acquired the loading lock. // Double-check: verify load is still needed after acquiring lock diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 5dd2540e..9a4b0c9e 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -102,6 +102,12 @@ impl Flusher { let n_series = series.len(); let n_distributions = distributions.len(); + // Early return if there are no metrics to flush + if n_series == 0 && n_distributions == 0 { + debug!("No metrics to flush, skipping"); + return None; + } + debug!("Flushing {n_series} series and {n_distributions} distributions"); // Save copies for potential error returns @@ -224,3 +230,169 @@ async fn should_try_next_batch(resp: Result) -> (bool, } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::aggregator_service::AggregatorService; + use crate::constants::CONTEXTS; + use crate::datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}; + use crate::metric::SortedTags; + + #[tokio::test] + async fn test_flush_metrics_empty_returns_none() { + use std::pin::Pin; + + // Create an aggregator service for the flusher + let (service, handle) = AggregatorService::new( + SortedTags::parse("test:value").expect("failed to parse tags"), + CONTEXTS, + ) + .expect("failed to create aggregator service"); + + // Spawn the service + tokio::spawn(service.run()); + + // Create an API key factory with a resolver that panics if called + // This proves that get_dd_api() is never invoked when there are no metrics + let api_key_factory = ApiKeyFactory::new_from_resolver( + Arc::new(|| { + Box::pin(async { + panic!("API key resolver should not be called for empty metrics!"); + #[allow(unreachable_code)] + { + None + } + }) + as Pin> + Send>> + }), + None, + ); + + let mut flusher = Flusher::new(FlusherConfig { + api_key_factory: Arc::new(api_key_factory), + aggregator_handle: handle, + metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( + None, + MetricsIntakeUrlPrefixOverride::maybe_new( + None, + Some( + DdDdUrl::new("http://localhost:8080".to_string()) + .expect("failed to create URL"), + ), + ), + ) + .expect("failed to create URL"), + https_proxy: None, + timeout: Duration::from_secs(5), + retry_strategy: RetryStrategy::Immediate(1), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), + }); + + // Test with empty vectors + let result = flusher.flush_metrics(Vec::new(), Vec::new()).await; + + // Should return None when there are no metrics to flush + // If get_dd_api() was called, the test would panic from the resolver + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_flush_metrics_with_series_only() { + use crate::aggregator::Aggregator; + use crate::metric::parse; + + // Create an aggregator service for the flusher + let (service, handle) = AggregatorService::new( + SortedTags::parse("test:value").expect("failed to parse tags"), + CONTEXTS, + ) + .expect("failed to create aggregator service"); + + // Spawn the service + tokio::spawn(service.run()); + + let api_key_factory = ApiKeyFactory::new("test-api-key"); + + let mut flusher = Flusher::new(FlusherConfig { + api_key_factory: Arc::new(api_key_factory), + aggregator_handle: handle, + metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( + None, + MetricsIntakeUrlPrefixOverride::maybe_new( + None, + Some( + DdDdUrl::new("http://localhost:8080".to_string()) + .expect("failed to create URL"), + ), + ), + ) + .expect("failed to create URL"), + https_proxy: None, + timeout: Duration::from_secs(5), + retry_strategy: RetryStrategy::Immediate(1), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), + }); + + // Create a series with actual metrics + let mut aggregator = Aggregator::new(SortedTags::parse("test:value").unwrap(), 1) + .expect("failed to create aggregator"); + let metric = parse("test:1|c").expect("failed to parse metric"); + aggregator.insert(metric).expect("failed to insert metric"); + let series = vec![aggregator.to_series()]; + + // Test with series but empty distributions + let result = flusher.flush_metrics(series, Vec::new()).await; + + // Should attempt to flush and return Some with failed metrics (since we're not mocking the API) + assert!(result.is_some()); + } + + #[tokio::test] + async fn test_flush_metrics_with_distributions_only() { + // Create an aggregator service for the flusher + let (service, handle) = AggregatorService::new( + SortedTags::parse("test:value").expect("failed to parse tags"), + CONTEXTS, + ) + .expect("failed to create aggregator service"); + + // Spawn the service + tokio::spawn(service.run()); + + let api_key_factory = ApiKeyFactory::new("test-api-key"); + + let mut flusher = Flusher::new(FlusherConfig { + api_key_factory: Arc::new(api_key_factory), + aggregator_handle: handle, + metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( + None, + MetricsIntakeUrlPrefixOverride::maybe_new( + None, + Some( + DdDdUrl::new("http://localhost:8080".to_string()) + .expect("failed to create URL"), + ), + ), + ) + .expect("failed to create URL"), + https_proxy: None, + timeout: Duration::from_secs(5), + retry_strategy: RetryStrategy::Immediate(1), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), + }); + + // Create a distribution sketch payload + let sketch = datadog_protos::metrics::SketchPayload::default(); + let distributions = vec![sketch]; + + // Test with distributions but empty series + let result = flusher.flush_metrics(Vec::new(), distributions).await; + + // Should attempt to flush and return Some with failed metrics (since we're not mocking the API) + assert!(result.is_some()); + } +}