From 3d5692c009066e234bd6bb22926ac2661ec8b064 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 28 May 2025 10:24:46 -0400 Subject: [PATCH 1/7] debug logs --- crates/dogstatsd/src/datadog.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index 7d9a7c3..8dd155a 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -164,6 +164,7 @@ impl DdApi { /// Ship a serialized series to the API, blocking pub async fn ship_series(&self, series: &Series) -> Result { + debug!("=== shipping serialized series to endpoint: {} ===", &self.metrics_intake_url_prefix); let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix); let safe_body = serde_json::to_vec(&series) .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; @@ -175,6 +176,7 @@ impl DdApi { &self, sketches: &SketchPayload, ) -> Result { + debug!("=== shipping distributions to endpoint: {} ===", &self.metrics_intake_url_prefix); let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix); let safe_body = sketches .write_to_bytes() @@ -199,6 +201,7 @@ impl DdApi { body: Vec, content_type: &str, ) -> Result { + debug!("=== shipping data to endpoint: {} ===", &self.metrics_intake_url_prefix); let client = &self .client .as_ref() @@ -228,6 +231,7 @@ impl DdApi { let elapsed = start.elapsed(); debug!("Request to {} took {}ms", url, elapsed.as_millis()); + debug!("=== completed shipping data to endpoint: {} ===", &self.metrics_intake_url_prefix); resp } From 9c084533a0e320afe55b075703dbbf5846729bdc Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 29 May 2025 10:13:36 -0400 Subject: [PATCH 2/7] implement Flusher clone --- crates/dogstatsd/src/flusher.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index d62b90a..c44ce74 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -13,6 +13,15 @@ pub struct Flusher { aggregator: Arc>, } +impl Clone for Flusher { + fn clone(&self) -> Self { + Flusher { + dd_api: self.dd_api.clone(), + aggregator: Arc::clone(&self.aggregator), + } + } +} + pub struct FlusherConfig { pub api_key: String, pub aggregator: Arc>, From a1872ff64a18f4c19ba6d777eae7bcea5acca9fe Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 29 May 2025 10:46:30 -0400 Subject: [PATCH 3/7] flush metrics batch --- crates/dogstatsd/src/flusher.rs | 34 +++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index c44ce74..a981358 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -47,17 +47,41 @@ impl Flusher { } } + /// Flush metrics from the aggregator pub async fn flush( &mut self, ) -> Option<( Vec, Vec, )> { - self.flush_with_retries(None, None).await + // Collect metrics from the aggregator + let (series, sketches) = { + #[allow(clippy::expect_used)] + let mut aggregator = self.aggregator.lock().expect("lock poisoned"); + ( + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) + }; + self.flush_metrics(series, sketches).await + } + + /// Flush explicitly provided metrics + pub async fn flush_metrics( + &mut self, + series: Vec, + sketches: Vec, + ) -> Option<( + Vec, + Vec, + )> { + self.flush_with_retries(series, sketches, None, None).await } pub async fn flush_with_retries( &mut self, + series: Vec, + sketches: Vec, retry_series: Option>, retry_sketches: Option>, ) -> Option<( @@ -72,13 +96,7 @@ impl Flusher { retry_sketches.unwrap_or_default(), ) } else { - // Collect new metrics from the aggregator - #[allow(clippy::expect_used)] - let mut aggregator = self.aggregator.lock().expect("lock poisoned"); - ( - aggregator.consume_metrics(), - aggregator.consume_distributions(), - ) + (series, sketches) }; let n_series = all_series.len(); From d083f5f3c3ff7998790aa6c801828446dead5409 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 29 May 2025 11:31:44 -0400 Subject: [PATCH 4/7] flush metrics fn --- crates/dogstatsd/src/flusher.rs | 41 +++++++++++---------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index a981358..06c15bf 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -66,39 +66,26 @@ impl Flusher { self.flush_metrics(series, sketches).await } - /// Flush explicitly provided metrics - pub async fn flush_metrics( - &mut self, - series: Vec, - sketches: Vec, - ) -> Option<( - Vec, - Vec, - )> { - self.flush_with_retries(series, sketches, None, None).await - } + // /// Flush explicitly provided metrics + // pub async fn flush_metrics( + // &mut self, + // series: Vec, + // sketches: Vec, + // ) -> Option<( + // Vec, + // Vec, + // )> { + // self.flush_with_retries(series, sketches, None, None).await + // } - pub async fn flush_with_retries( + pub async fn flush_metrics( &mut self, - series: Vec, - sketches: Vec, - retry_series: Option>, - retry_sketches: Option>, + all_series: Vec, + all_distributions: Vec, ) -> Option<( Vec, Vec, )> { - let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some() - { - // Use the provided metrics for retry - ( - retry_series.unwrap_or_default(), - retry_sketches.unwrap_or_default(), - ) - } else { - (series, sketches) - }; - let n_series = all_series.len(); let n_distributions = all_distributions.len(); From ba26f297f575163930ef8009864dc15872ac64a6 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 29 May 2025 16:34:40 -0400 Subject: [PATCH 5/7] remove extra debug logs --- crates/dogstatsd/src/datadog.rs | 4 ---- crates/dogstatsd/src/flusher.rs | 31 ++++++++++--------------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index 8dd155a..7d9a7c3 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -164,7 +164,6 @@ impl DdApi { /// Ship a serialized series to the API, blocking pub async fn ship_series(&self, series: &Series) -> Result { - debug!("=== shipping serialized series to endpoint: {} ===", &self.metrics_intake_url_prefix); let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix); let safe_body = serde_json::to_vec(&series) .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; @@ -176,7 +175,6 @@ impl DdApi { &self, sketches: &SketchPayload, ) -> Result { - debug!("=== shipping distributions to endpoint: {} ===", &self.metrics_intake_url_prefix); let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix); let safe_body = sketches .write_to_bytes() @@ -201,7 +199,6 @@ impl DdApi { body: Vec, content_type: &str, ) -> Result { - debug!("=== shipping data to endpoint: {} ===", &self.metrics_intake_url_prefix); let client = &self .client .as_ref() @@ -231,7 +228,6 @@ impl DdApi { let elapsed = start.elapsed(); debug!("Request to {} took {}ms", url, elapsed.as_millis()); - debug!("=== completed shipping data to endpoint: {} ===", &self.metrics_intake_url_prefix); resp } diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 06c15bf..070d7d3 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -66,40 +66,29 @@ impl Flusher { self.flush_metrics(series, sketches).await } - // /// Flush explicitly provided metrics - // pub async fn flush_metrics( - // &mut self, - // series: Vec, - // sketches: Vec, - // ) -> Option<( - // Vec, - // Vec, - // )> { - // self.flush_with_retries(series, sketches, None, None).await - // } - + /// Flush given batch of metrics pub async fn flush_metrics( &mut self, - all_series: Vec, - all_distributions: Vec, + series: Vec, + distributions: Vec, ) -> Option<( Vec, Vec, )> { - let n_series = all_series.len(); - let n_distributions = all_distributions.len(); + let n_series = series.len(); + let n_distributions = distributions.len(); debug!("Flushing {n_series} series and {n_distributions} distributions"); // Save copies for potential error returns - let all_series_copy = all_series.clone(); - let all_distributions_copy = all_distributions.clone(); + let series_copy = series.clone(); + let distributions_copy = distributions.clone(); let dd_api_clone = self.dd_api.clone(); let series_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; - for a_batch in all_series { + for a_batch in series { let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; if should_retry { @@ -117,7 +106,7 @@ impl Flusher { let distributions_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; - for a_batch in all_distributions { + for a_batch in distributions { let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; if should_retry { @@ -150,7 +139,7 @@ impl Flusher { Err(err) => { error!("Failed to flush metrics: {err}"); // Return all metrics in case of join error for potential retry - Some((all_series_copy, all_distributions_copy)) + Some((series_copy, distributions_copy)) } } } From 8613f3df6f584d0f7d95d7f693bce2455693bc3f Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 30 May 2025 10:44:20 -0400 Subject: [PATCH 6/7] rename --- crates/dogstatsd/src/flusher.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 070d7d3..bfac390 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -54,8 +54,7 @@ impl Flusher { Vec, Vec, )> { - // Collect metrics from the aggregator - let (series, sketches) = { + let (series, distributions) = { #[allow(clippy::expect_used)] let mut aggregator = self.aggregator.lock().expect("lock poisoned"); ( @@ -63,7 +62,7 @@ impl Flusher { aggregator.consume_distributions(), ) }; - self.flush_metrics(series, sketches).await + self.flush_metrics(series, distributions).await } /// Flush given batch of metrics From 1fbe579b5c6e473fbe3e0a502277acff86b69902 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 10 Jun 2025 09:45:35 -0400 Subject: [PATCH 7/7] derive clone --- crates/dogstatsd/src/flusher.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index bfac390..b4bc771 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -8,20 +8,12 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; +#[derive(Clone)] pub struct Flusher { dd_api: DdApi, aggregator: Arc>, } -impl Clone for Flusher { - fn clone(&self) -> Self { - Flusher { - dd_api: self.dd_api.clone(), - aggregator: Arc::clone(&self.aggregator), - } - } -} - pub struct FlusherConfig { pub api_key: String, pub aggregator: Arc>,