diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index d62b90a..b4bc771 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; +#[derive(Clone)] pub struct Flusher { dd_api: DdApi, aggregator: Arc>, @@ -38,32 +39,14 @@ impl Flusher { } } + /// Flush metrics from the aggregator pub async fn flush( &mut self, ) -> Option<( Vec, Vec, )> { - self.flush_with_retries(None, None).await - } - - pub async fn flush_with_retries( - &mut self, - retry_series: Option>, - retry_sketches: Option>, - ) -> Option<( - Vec, - Vec, - )> { - let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some() - { - // Use the provided metrics for retry - ( - retry_series.unwrap_or_default(), - retry_sketches.unwrap_or_default(), - ) - } else { - // Collect new metrics from the aggregator + let (series, distributions) = { #[allow(clippy::expect_used)] let mut aggregator = self.aggregator.lock().expect("lock poisoned"); ( @@ -71,21 +54,32 @@ impl Flusher { aggregator.consume_distributions(), ) }; + self.flush_metrics(series, distributions).await + } - let n_series = all_series.len(); - let n_distributions = all_distributions.len(); + /// Flush given batch of metrics + pub async fn flush_metrics( + &mut self, + series: Vec, + distributions: Vec, + ) -> Option<( + Vec, + Vec, + )> { + let n_series = series.len(); + let n_distributions = distributions.len(); debug!("Flushing {n_series} series and {n_distributions} distributions"); // Save copies for potential error returns - let all_series_copy = all_series.clone(); - let all_distributions_copy = all_distributions.clone(); + let series_copy = series.clone(); + let distributions_copy = distributions.clone(); let dd_api_clone = self.dd_api.clone(); let series_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; - for a_batch in all_series { + for a_batch in series { let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; if should_retry { @@ -103,7 +97,7 @@ impl Flusher { let distributions_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; - for a_batch in all_distributions { + for a_batch in distributions { let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; if should_retry { @@ -136,7 +130,7 @@ impl Flusher { Err(err) => { error!("Failed to flush metrics: {err}"); // Return all metrics in case of join error for potential retry - Some((all_series_copy, all_distributions_copy)) + Some((series_copy, distributions_copy)) } } }