Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/dogstatsd/src/api_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ impl ApiKeyFactory {
} => {
if self.should_load_api_key().await {
// Try to acquire the loading lock.
if (loading_api_key
if loading_api_key
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok())
.is_ok()
{
// Acquired the loading lock.
// Double-check: verify load is still needed after acquiring lock
Expand Down
172 changes: 172 additions & 0 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ impl Flusher {
let n_series = series.len();
let n_distributions = distributions.len();

// Early return if there are no metrics to flush
if n_series == 0 && n_distributions == 0 {
debug!("No metrics to flush, skipping");
return None;
}

debug!("Flushing {n_series} series and {n_distributions} distributions");

// Save copies for potential error returns
Expand Down Expand Up @@ -224,3 +230,169 @@ async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> (bool,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::aggregator_service::AggregatorService;
use crate::constants::CONTEXTS;
use crate::datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride};
use crate::metric::SortedTags;

#[tokio::test]
async fn test_flush_metrics_empty_returns_none() {
use std::pin::Pin;

// Create an aggregator service for the flusher
let (service, handle) = AggregatorService::new(
SortedTags::parse("test:value").expect("failed to parse tags"),
CONTEXTS,
)
.expect("failed to create aggregator service");

// Spawn the service
tokio::spawn(service.run());

// Create an API key factory with a resolver that panics if called
// This proves that get_dd_api() is never invoked when there are no metrics
let api_key_factory = ApiKeyFactory::new_from_resolver(
Arc::new(|| {
Box::pin(async {
panic!("API key resolver should not be called for empty metrics!");
#[allow(unreachable_code)]
{
None
}
})
as Pin<Box<dyn std::future::Future<Output = Option<String>> + Send>>
}),
None,
);

let mut flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(api_key_factory),
aggregator_handle: handle,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
MetricsIntakeUrlPrefixOverride::maybe_new(
None,
Some(
DdDdUrl::new("http://localhost:8080".to_string())
.expect("failed to create URL"),
),
),
)
.expect("failed to create URL"),
https_proxy: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
.expect("failed to create compression level"),
});

// Test with empty vectors
let result = flusher.flush_metrics(Vec::new(), Vec::new()).await;

// Should return None when there are no metrics to flush
// If get_dd_api() was called, the test would panic from the resolver
assert!(result.is_none());
}

#[tokio::test]
async fn test_flush_metrics_with_series_only() {
use crate::aggregator::Aggregator;
use crate::metric::parse;

// Create an aggregator service for the flusher
let (service, handle) = AggregatorService::new(
SortedTags::parse("test:value").expect("failed to parse tags"),
CONTEXTS,
)
.expect("failed to create aggregator service");

// Spawn the service
tokio::spawn(service.run());

let api_key_factory = ApiKeyFactory::new("test-api-key");

let mut flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(api_key_factory),
aggregator_handle: handle,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
MetricsIntakeUrlPrefixOverride::maybe_new(
None,
Some(
DdDdUrl::new("http://localhost:8080".to_string())
.expect("failed to create URL"),
),
),
)
.expect("failed to create URL"),
https_proxy: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
.expect("failed to create compression level"),
});

// Create a series with actual metrics
let mut aggregator = Aggregator::new(SortedTags::parse("test:value").unwrap(), 1)
.expect("failed to create aggregator");
let metric = parse("test:1|c").expect("failed to parse metric");
aggregator.insert(metric).expect("failed to insert metric");
let series = vec![aggregator.to_series()];

// Test with series but empty distributions
let result = flusher.flush_metrics(series, Vec::new()).await;

// Should attempt to flush and return Some with failed metrics (since we're not mocking the API)
assert!(result.is_some());
}

#[tokio::test]
async fn test_flush_metrics_with_distributions_only() {
// Create an aggregator service for the flusher
let (service, handle) = AggregatorService::new(
SortedTags::parse("test:value").expect("failed to parse tags"),
CONTEXTS,
)
.expect("failed to create aggregator service");

// Spawn the service
tokio::spawn(service.run());

let api_key_factory = ApiKeyFactory::new("test-api-key");

let mut flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(api_key_factory),
aggregator_handle: handle,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
MetricsIntakeUrlPrefixOverride::maybe_new(
None,
Some(
DdDdUrl::new("http://localhost:8080".to_string())
.expect("failed to create URL"),
),
),
)
.expect("failed to create URL"),
https_proxy: None,
timeout: Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(1),
compression_level: CompressionLevel::try_from(6)
.expect("failed to create compression level"),
});

// Create a distribution sketch payload
let sketch = datadog_protos::metrics::SketchPayload::default();
let distributions = vec![sketch];

// Test with distributions but empty series
let result = flusher.flush_metrics(Vec::new(), distributions).await;

// Should attempt to flush and return Some with failed metrics (since we're not mocking the API)
assert!(result.is_some());
}
}
Loading