Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions crates/datadog-trace-agent/src/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ pub trait TraceFlusher {
/// implementing flushing logic that calls flush_traces.
async fn start_trace_flusher(&self, mut rx: Receiver<SendData>);
/// Given a `Vec<SendData>`, a tracer payload, send it to the Datadog intake endpoint.
async fn send(&self, traces: Vec<SendData>);
/// Returns the traces back if there was an error sending them.
async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>>;

/// Flushes traces by getting every available batch on the aggregator.
async fn flush(&self);
/// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces.
/// Returns any traces that failed to send and should be retried.
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>>;
}

#[derive(Clone)]
Expand All @@ -51,39 +54,66 @@ impl TraceFlusher for ServerlessTraceFlusher {

loop {
tokio::time::sleep(time::Duration::from_secs(self.config.trace_flush_interval)).await;
self.flush().await;
self.flush(None).await;
}
}

async fn flush(&self) {
let mut guard = self.aggregator.lock().await;
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
let mut failed_batch: Option<Vec<SendData>> = None;

if let Some(traces) = failed_traces {
// If we have traces from a previous failed attempt, try to send those first
if !traces.is_empty() {
debug!("Retrying to send {} previously failed traces", traces.len());
let retry_result = self.send(traces).await;
if retry_result.is_some() {
// Still failed, return to retry later
return retry_result;
}
}
}

// Process new traces from the aggregator
let mut guard = self.aggregator.lock().await;
let mut traces = guard.get_batch();

while !traces.is_empty() {
self.send(traces).await;
if let Some(failed) = self.send(traces).await {
// Keep track of the failed batch
failed_batch = Some(failed);
// Stop processing more batches if we have a failure
break;
}

traces = guard.get_batch();
}

failed_batch
}

async fn send(&self, traces: Vec<SendData>) {
async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>> {
if traces.is_empty() {
return;
return None;
}
debug!("Flushing {} traces", traces.len());

for traces in trace_utils::coalesce_send_data(traces) {
match traces
// Since we return the original traces on error, we need to clone them before coalescing
let traces_clone = traces.clone();

for coalesced_traces in trace_utils::coalesce_send_data(traces) {
Comment thread
astuyve marked this conversation as resolved.
match coalesced_traces
.send_proxy(self.config.proxy_url.as_deref())
.await
.last_result
{
Ok(_) => debug!("Successfully flushed traces"),
Err(e) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to only clone it after it fails here? Or is coalesce directly taking ownership of it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately coalesce_and_send takes ownership. This can be improved inside libdatadog

error!("Error sending trace: {e:?}")
// TODO: Retries
error!("Error sending trace: {e:?}");
// Return the original traces for retry
return Some(traces_clone);
}
}
}
None
}
}
6 changes: 3 additions & 3 deletions crates/dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ pub(crate) struct Point {
pub(crate) value: f64,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
/// A named resource
pub(crate) struct Resource {
/// The name of this resource
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Serialize for DdMetricKind {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
#[allow(clippy::struct_field_names)]
/// A named collection of `Point` instances.
pub(crate) struct Metric {
Expand All @@ -351,7 +351,7 @@ pub(crate) struct Metric {
pub(crate) tags: Vec<String>,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
/// A collection of metrics as defined by the Datadog Metrics API.
// NOTE we have a number of `Vec` instances in this implementation that could
// otherwise be arrays, given that we have constants. Serializing to JSON would
Expand Down
102 changes: 89 additions & 13 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,32 @@ impl Flusher {
}
}

pub async fn flush(&mut self) {
let (all_series, all_distributions) = {
pub async fn flush(
&mut self,
) -> Option<(
Vec<crate::datadog::Series>,
Vec<datadog_protos::metrics::SketchPayload>,
)> {
self.flush_with_retries(None, None).await
}

pub async fn flush_with_retries(
&mut self,
retry_series: Option<Vec<crate::datadog::Series>>,
retry_sketches: Option<Vec<datadog_protos::metrics::SketchPayload>>,
) -> Option<(
Vec<crate::datadog::Series>,
Vec<datadog_protos::metrics::SketchPayload>,
)> {
let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some()
{
// Use the provided metrics for retry
(
retry_series.unwrap_or_default(),
retry_sketches.unwrap_or_default(),
)
} else {
// Collect new metrics from the aggregator
#[allow(clippy::expect_used)]
let mut aggregator = self.aggregator.lock().expect("lock poisoned");
(
Expand All @@ -53,35 +77,68 @@ impl Flusher {

debug!("Flushing {n_series} series and {n_distributions} distributions");

// Save copies for potential error returns
let all_series_copy = all_series.clone();
let all_distributions_copy = all_distributions.clone();

let dd_api_clone = self.dd_api.clone();
let series_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
for a_batch in all_series {
let continue_shipping =
let (continue_shipping, should_retry) =
should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await;
if should_retry {
failed.push(a_batch);
had_shipping_error = true;
}
if !continue_shipping {
break;
}
}
(failed, had_shipping_error)
});

let dd_api_clone = self.dd_api.clone();
let distributions_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
for a_batch in all_distributions {
let continue_shipping =
let (continue_shipping, should_retry) =
should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await;
if should_retry {
failed.push(a_batch);
had_shipping_error = true;
}
if !continue_shipping {
break;
}
}
(failed, had_shipping_error)
});

match tokio::try_join!(series_handle, distributions_handle) {
Ok(_) => {
debug!("Successfully flushed {n_series} series and {n_distributions} distributions")
Ok(((series_failed, series_had_error), (sketches_failed, sketches_had_error))) => {
if series_failed.is_empty() && sketches_failed.is_empty() {
debug!("Successfully flushed {n_series} series and {n_distributions} distributions");
None // Return None to indicate success
} else if series_had_error || sketches_had_error {
// Only return the metrics if there was an actual shipping error
error!("Failed to flush some metrics due to shipping errors: {} series and {} sketches",
series_failed.len(), sketches_failed.len());
// Return the failed metrics for potential retry
Some((series_failed, sketches_failed))
} else {
debug!("Some metrics were not sent but no errors occurred");
Copy link

Copilot AI May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding a comment explaining why failed batches that did not encounter a shipping error are not returned for retry. This clarification would help future maintainers understand the intended behavior.

Copilot uses AI. Check for mistakes.
None // No shipping errors, so don't return metrics for retry
}
}
Err(err) => {
error!("Failed to flush metrics{err}")
error!("Failed to flush metrics: {err}");
// Return all metrics in case of join error for potential retry
Some((all_series_copy, all_distributions_copy))
}
};
}
}
}

Expand All @@ -90,26 +147,45 @@ pub enum ShippingError {
Destination(Option<StatusCode>, String),
}

async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> bool {
/// Returns a tuple (continue_to_next_batch, should_retry_this_batch)
async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> (bool, bool) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably should just return an enum instead of a tuple of bools

match resp {
Ok(resp_payload) => match resp_payload.status() {
StatusCode::ACCEPTED => true,
StatusCode::ACCEPTED => (true, false), // Success, continue to next batch, no need to retry
unexpected_status_code => {
// Check if the status code indicates a permanent error (4xx) or a temporary error (5xx)
let is_permanent_error =
unexpected_status_code.as_u16() >= 400 && unexpected_status_code.as_u16() < 500;

error!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp_payload.text().await.unwrap_or_default()
);
true

if is_permanent_error {
(true, false) // Permanent error, continue to next batch but don't retry
} else {
(false, true) // Temporary error, don't continue to next batch and mark for retry
}
}
},
Err(ShippingError::Payload(msg)) => {
error!("Failed to prepare payload. Data dropped: {}", msg);
true
(true, false) // Payload error, continue to next batch but don't retry (data is malformed)
}
Err(ShippingError::Destination(sc, msg)) => {
// Check if status code indicates a permanent error
let is_permanent_error =
sc.map_or(false, |code| code.as_u16() >= 400 && code.as_u16() < 500);

error!("Error shipping data: {:?} {}", sc, msg);
false

if is_permanent_error {
(false, false) // Permanent destination error, don't continue and don't retry
} else {
(false, true) // Temporary error, don't continue and mark for retry
}
}
}
}
Loading