From 21e4c0f2743c1469483fcac85095c477bb814c84 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 13 May 2025 11:24:31 -0400 Subject: [PATCH 1/4] feat: retry/repush to support continuous background flushing --- crates/dogstatsd/src/datadog.rs | 6 +- crates/dogstatsd/src/flusher.rs | 102 ++++++++++++++++++++++++++++---- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index ba1f7a12..7d9a7c3c 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -304,7 +304,7 @@ pub(crate) struct Point { pub(crate) value: f64, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] /// A named resource pub(crate) struct Resource { /// The name of this resource @@ -335,7 +335,7 @@ impl Serialize for DdMetricKind { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] #[allow(clippy::struct_field_names)] /// A named collection of `Point` instances. pub(crate) struct Metric { @@ -351,7 +351,7 @@ pub(crate) struct Metric { pub(crate) tags: Vec, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] /// A collection of metrics as defined by the Datadog Metrics API. // NOTE we have a number of `Vec` instances in this implementation that could // otherwise be arrays, given that we have constants. Serializing to JSON would diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 35a3f8c3..d62b90ae 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -38,8 +38,32 @@ impl Flusher { } } - pub async fn flush(&mut self) { - let (all_series, all_distributions) = { + pub async fn flush( + &mut self, + ) -> Option<( + Vec, + Vec, + )> { + self.flush_with_retries(None, None).await + } + + pub async fn flush_with_retries( + &mut self, + retry_series: Option>, + retry_sketches: Option>, + ) -> Option<( + Vec, + Vec, + )> { + let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some() + { + // Use the provided metrics for retry + ( + retry_series.unwrap_or_default(), + retry_sketches.unwrap_or_default(), + ) + } else { + // Collect new metrics from the aggregator #[allow(clippy::expect_used)] let mut aggregator = self.aggregator.lock().expect("lock poisoned"); ( @@ -53,35 +77,68 @@ impl Flusher { debug!("Flushing {n_series} series and {n_distributions} distributions"); + // Save copies for potential error returns + let all_series_copy = all_series.clone(); + let all_distributions_copy = all_distributions.clone(); + let dd_api_clone = self.dd_api.clone(); let series_handle = tokio::spawn(async move { + let mut failed = Vec::new(); + let mut had_shipping_error = false; for a_batch in all_series { - let continue_shipping = + let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; + if should_retry { + failed.push(a_batch); + had_shipping_error = true; + } if !continue_shipping { break; } } + (failed, had_shipping_error) }); + let dd_api_clone = self.dd_api.clone(); let distributions_handle = tokio::spawn(async move { + let mut failed = Vec::new(); + let mut had_shipping_error = false; for a_batch in all_distributions { - let continue_shipping = + let (continue_shipping, should_retry) = should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; + if should_retry { + failed.push(a_batch); + had_shipping_error = true; + } if !continue_shipping { break; } } + (failed, had_shipping_error) }); match tokio::try_join!(series_handle, distributions_handle) { - Ok(_) => { - debug!("Successfully flushed {n_series} series and {n_distributions} distributions") + Ok(((series_failed, series_had_error), (sketches_failed, sketches_had_error))) => { + if series_failed.is_empty() && sketches_failed.is_empty() { + debug!("Successfully flushed {n_series} series and {n_distributions} distributions"); + None // Return None to indicate success + } else if series_had_error || sketches_had_error { + // Only return the metrics if there was an actual shipping error + error!("Failed to flush some metrics due to shipping errors: {} series and {} sketches", + series_failed.len(), sketches_failed.len()); + // Return the failed metrics for potential retry + Some((series_failed, sketches_failed)) + } else { + debug!("Some metrics were not sent but no errors occurred"); + None // No shipping errors, so don't return metrics for retry + } } Err(err) => { - error!("Failed to flush metrics{err}") + error!("Failed to flush metrics: {err}"); + // Return all metrics in case of join error for potential retry + Some((all_series_copy, all_distributions_copy)) } - }; + } } } @@ -90,26 +147,45 @@ pub enum ShippingError { Destination(Option, String), } -async fn should_try_next_batch(resp: Result) -> bool { +/// Returns a tuple (continue_to_next_batch, should_retry_this_batch) +async fn should_try_next_batch(resp: Result) -> (bool, bool) { match resp { Ok(resp_payload) => match resp_payload.status() { - StatusCode::ACCEPTED => true, + StatusCode::ACCEPTED => (true, false), // Success, continue to next batch, no need to retry unexpected_status_code => { + // Check if the status code indicates a permanent error (4xx) or a temporary error (5xx) + let is_permanent_error = + unexpected_status_code.as_u16() >= 400 && unexpected_status_code.as_u16() < 500; + error!( "{}: Failed to push to API: {:?}", unexpected_status_code, resp_payload.text().await.unwrap_or_default() ); - true + + if is_permanent_error { + (true, false) // Permanent error, continue to next batch but don't retry + } else { + (false, true) // Temporary error, don't continue to next batch and mark for retry + } } }, Err(ShippingError::Payload(msg)) => { error!("Failed to prepare payload. Data dropped: {}", msg); - true + (true, false) // Payload error, continue to next batch but don't retry (data is malformed) } Err(ShippingError::Destination(sc, msg)) => { + // Check if status code indicates a permanent error + let is_permanent_error = + sc.map_or(false, |code| code.as_u16() >= 400 && code.as_u16() < 500); + error!("Error shipping data: {:?} {}", sc, msg); - false + + if is_permanent_error { + (false, false) // Permanent destination error, don't continue and don't retry + } else { + (false, true) // Temporary error, don't continue and mark for retry + } } } } From b714781bc6dc92c620ade6eb21f3964dd6ec582d Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 13 May 2025 16:35:44 -0400 Subject: [PATCH 2/4] feat: optionally retry redrive failed traces --- .../datadog-trace-agent/src/trace_flusher.rs | 54 ++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/crates/datadog-trace-agent/src/trace_flusher.rs b/crates/datadog-trace-agent/src/trace_flusher.rs index a455cae5..b33be204 100644 --- a/crates/datadog-trace-agent/src/trace_flusher.rs +++ b/crates/datadog-trace-agent/src/trace_flusher.rs @@ -21,10 +21,13 @@ pub trait TraceFlusher { /// implementing flushing logic that calls flush_traces. async fn start_trace_flusher(&self, mut rx: Receiver); /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. - async fn send(&self, traces: Vec); + /// Returns the traces back if there was an error sending them. + async fn send(&self, traces: Vec) -> Option>; /// Flushes traces by getting every available batch on the aggregator. - async fn flush(&self); + /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. + /// Returns any traces that failed to send and should be retried. + async fn flush(&self, failed_traces: Option>) -> Option>; } #[derive(Clone)] @@ -51,39 +54,66 @@ impl TraceFlusher for ServerlessTraceFlusher { loop { tokio::time::sleep(time::Duration::from_secs(self.config.trace_flush_interval)).await; - self.flush().await; + self.flush(None).await; } } - async fn flush(&self) { - let mut guard = self.aggregator.lock().await; + async fn flush(&self, failed_traces: Option>) -> Option> { + let mut failed_batch: Option> = None; + + if let Some(traces) = failed_traces { + // If we have traces from a previous failed attempt, try to send those first + if !traces.is_empty() { + debug!("Retrying to send {} previously failed traces", traces.len()); + let retry_result = self.send(traces).await; + if retry_result.is_some() { + // Still failed, return to retry later + return retry_result; + } + } + } + // Process new traces from the aggregator + let mut guard = self.aggregator.lock().await; let mut traces = guard.get_batch(); + while !traces.is_empty() { - self.send(traces).await; + if let Some(failed) = self.send(traces).await { + // Keep track of the failed batch + failed_batch = Some(failed); + // Stop processing more batches if we have a failure + break; + } traces = guard.get_batch(); } + + failed_batch } - async fn send(&self, traces: Vec) { + async fn send(&self, traces: Vec) -> Option> { if traces.is_empty() { - return; + return None; } debug!("Flushing {} traces", traces.len()); - for traces in trace_utils::coalesce_send_data(traces) { - match traces + // Since we return the original traces on error, we need to clone them before coalescing + let traces_clone = traces.clone(); + + for coalesced_traces in trace_utils::coalesce_send_data(traces) { + match coalesced_traces .send_proxy(self.config.proxy_url.as_deref()) .await .last_result { Ok(_) => debug!("Successfully flushed traces"), Err(e) => { - error!("Error sending trace: {e:?}") - // TODO: Retries + error!("Error sending trace: {e:?}"); + // Return the original traces for retry + return Some(traces_clone); } } } + None } } From e943bd1a55eee20a4b2dbb8b4c76d0e16a8e9a49 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 20 May 2025 13:23:55 -0400 Subject: [PATCH 3/4] Update crates/datadog-trace-agent/src/trace_flusher.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/datadog-trace-agent/src/trace_flusher.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/datadog-trace-agent/src/trace_flusher.rs b/crates/datadog-trace-agent/src/trace_flusher.rs index b33be204..e852346e 100644 --- a/crates/datadog-trace-agent/src/trace_flusher.rs +++ b/crates/datadog-trace-agent/src/trace_flusher.rs @@ -97,10 +97,8 @@ impl TraceFlusher for ServerlessTraceFlusher { } debug!("Flushing {} traces", traces.len()); - // Since we return the original traces on error, we need to clone them before coalescing - let traces_clone = traces.clone(); - - for coalesced_traces in trace_utils::coalesce_send_data(traces) { + // Process and send coalesced traces + for coalesced_traces in trace_utils::coalesce_send_data(&traces) { match coalesced_traces .send_proxy(self.config.proxy_url.as_deref()) .await From 6ebb9d2c0b3f7f54a7f589b83a5ac3fce97d905d Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Tue, 20 May 2025 14:41:13 -0400 Subject: [PATCH 4/4] Revert "Update crates/datadog-trace-agent/src/trace_flusher.rs" This reverts commit e943bd1a55eee20a4b2dbb8b4c76d0e16a8e9a49. --- crates/datadog-trace-agent/src/trace_flusher.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/datadog-trace-agent/src/trace_flusher.rs b/crates/datadog-trace-agent/src/trace_flusher.rs index e852346e..b33be204 100644 --- a/crates/datadog-trace-agent/src/trace_flusher.rs +++ b/crates/datadog-trace-agent/src/trace_flusher.rs @@ -97,8 +97,10 @@ impl TraceFlusher for ServerlessTraceFlusher { } debug!("Flushing {} traces", traces.len()); - // Process and send coalesced traces - for coalesced_traces in trace_utils::coalesce_send_data(&traces) { + // Since we return the original traces on error, we need to clone them before coalescing + let traces_clone = traces.clone(); + + for coalesced_traces in trace_utils::coalesce_send_data(traces) { match coalesced_traces .send_proxy(self.config.proxy_url.as_deref()) .await