Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ async fn start_dogstatsd(
Some(dd_api_key) => {
#[allow(clippy::expect_used)]
let metrics_flusher = Flusher::new(FlusherConfig {
api_key: dd_api_key,
api_key_factory: Arc::new(move || {
let api_key = dd_api_key.clone();
Box::pin(async move { api_key })
}),
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
Some(Site::new(dd_site).expect("Failed to parse site")),
Expand Down
53 changes: 41 additions & 12 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,29 @@
use crate::aggregator::Aggregator;
use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy};
use reqwest::{Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, error};

pub type ApiKeyFactory =
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;

#[derive(Clone)]
pub struct Flusher {
dd_api: DdApi,
// Accept a future so the API key resolution is deferred until the flush happens
api_key_factory: ApiKeyFactory,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
aggregator: Arc<Mutex<Aggregator>>,
dd_api: Option<DdApi>,
}

pub struct FlusherConfig {
pub api_key: String,
pub api_key_factory: ApiKeyFactory,
pub aggregator: Arc<Mutex<Aggregator>>,
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
pub https_proxy: Option<String>,
Expand All @@ -26,17 +37,33 @@ pub struct FlusherConfig {
#[allow(clippy::await_holding_lock)]
impl Flusher {
pub fn new(config: FlusherConfig) -> Self {
let dd_api = DdApi::new(
config.api_key,
config.metrics_intake_url_prefix,
config.https_proxy,
config.timeout,
config.retry_strategy,
);
Flusher {
dd_api,
api_key_factory: config.api_key_factory,
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
https_proxy: config.https_proxy,
timeout: config.timeout,
retry_strategy: config.retry_strategy,
aggregator: config.aggregator,
dd_api: None,
}
}

async fn get_dd_api(&mut self) -> &DdApi {
if self.dd_api.is_none() {
let api_key = (self.api_key_factory)().await;
self.dd_api = Some(DdApi::new(
api_key,
self.metrics_intake_url_prefix.clone(),
self.https_proxy.clone(),
self.timeout,
self.retry_strategy.clone(),
));
}

#[allow(clippy::expect_used)]
self.dd_api
.as_ref()
.expect("dd_api should be initialized by this point")
}

/// Flush metrics from the aggregator
Expand Down Expand Up @@ -75,7 +102,9 @@ impl Flusher {
let series_copy = series.clone();
let distributions_copy = distributions.clone();

let dd_api_clone = self.dd_api.clone();
let dd_api = self.get_dd_api().await;

let dd_api_clone = dd_api.clone();
let series_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
Expand All @@ -93,7 +122,7 @@ impl Flusher {
(failed, had_shipping_error)
});

let dd_api_clone = self.dd_api.clone();
let dd_api_clone = dd_api.clone();
let distributions_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
Expand Down
7 changes: 5 additions & 2 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dogstatsd::{
constants::CONTEXTS,
datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride},
dogstatsd::{DogStatsD, DogStatsDConfig},
flusher::{Flusher, FlusherConfig},
flusher::{ApiKeyFactory, Flusher, FlusherConfig},
};
use mockito::Server;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -40,8 +40,11 @@ async fn dogstatsd_server_ships_series() {

let _ = start_dogstatsd(&metrics_aggr).await;

let api_key_factory: ApiKeyFactory =
Arc::new(|| Box::pin(async move { "mock-api-key".to_string() }));

let mut metrics_flusher = Flusher::new(FlusherConfig {
api_key: "mock-api-key".to_string(),
api_key_factory,
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
Expand Down
Loading