Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, error};

#[derive(Clone)]
pub struct Flusher {
dd_api: DdApi,
aggregator: Arc<Mutex<Aggregator>>,
Expand Down Expand Up @@ -38,54 +39,47 @@ impl Flusher {
}
}

/// Flush metrics from the aggregator
pub async fn flush(
&mut self,
) -> Option<(
Vec<crate::datadog::Series>,
Vec<datadog_protos::metrics::SketchPayload>,
)> {
self.flush_with_retries(None, None).await
}

pub async fn flush_with_retries(
&mut self,
retry_series: Option<Vec<crate::datadog::Series>>,
retry_sketches: Option<Vec<datadog_protos::metrics::SketchPayload>>,
) -> Option<(
Vec<crate::datadog::Series>,
Vec<datadog_protos::metrics::SketchPayload>,
)> {
let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some()
{
// Use the provided metrics for retry
(
retry_series.unwrap_or_default(),
retry_sketches.unwrap_or_default(),
)
} else {
// Collect new metrics from the aggregator
let (series, distributions) = {
#[allow(clippy::expect_used)]
let mut aggregator = self.aggregator.lock().expect("lock poisoned");
(
aggregator.consume_metrics(),
aggregator.consume_distributions(),
)
};
self.flush_metrics(series, distributions).await
}

let n_series = all_series.len();
let n_distributions = all_distributions.len();
/// Flush given batch of metrics
pub async fn flush_metrics(
&mut self,
series: Vec<crate::datadog::Series>,
distributions: Vec<datadog_protos::metrics::SketchPayload>,
) -> Option<(
Vec<crate::datadog::Series>,
Vec<datadog_protos::metrics::SketchPayload>,
)> {
let n_series = series.len();
let n_distributions = distributions.len();
Comment on lines +69 to +70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's just for the comment below, cant we delegate this to the debug?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, we could, its also used in another debug log later (line 123/125) if we want to keep it or I can directly reference in both


debug!("Flushing {n_series} series and {n_distributions} distributions");

// Save copies for potential error returns
let all_series_copy = all_series.clone();
let all_distributions_copy = all_distributions.clone();
let series_copy = series.clone();
let distributions_copy = distributions.clone();

let dd_api_clone = self.dd_api.clone();
let series_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
for a_batch in all_series {
for a_batch in series {
let (continue_shipping, should_retry) =
should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await;
if should_retry {
Expand All @@ -103,7 +97,7 @@ impl Flusher {
let distributions_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
for a_batch in all_distributions {
for a_batch in distributions {
let (continue_shipping, should_retry) =
should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await;
if should_retry {
Expand Down Expand Up @@ -136,7 +130,7 @@ impl Flusher {
Err(err) => {
error!("Failed to flush metrics: {err}");
// Return all metrics in case of join error for potential retry
Some((all_series_copy, all_distributions_copy))
Some((series_copy, distributions_copy))
}
}
}
Expand Down
Loading