diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 270d03a5..2112b6ad 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -203,7 +203,10 @@ async fn start_dogstatsd( Some(dd_api_key) => { #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { - api_key: dd_api_key, + api_key_factory: Arc::new(move || { + let api_key = dd_api_key.clone(); + Box::pin(async move { api_key }) + }), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index b4bc771c..5807b371 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -4,18 +4,29 @@ use crate::aggregator::Aggregator; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; +pub type ApiKeyFactory = + Arc Pin + Send>> + Send + Sync>; + #[derive(Clone)] pub struct Flusher { - dd_api: DdApi, + // Accept a future so the API key resolution is deferred until the flush happens + api_key_factory: ApiKeyFactory, + metrics_intake_url_prefix: MetricsIntakeUrlPrefix, + https_proxy: Option, + timeout: Duration, + retry_strategy: RetryStrategy, aggregator: Arc>, + dd_api: Option, } pub struct FlusherConfig { - pub api_key: String, + pub api_key_factory: ApiKeyFactory, pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, @@ -26,17 +37,33 @@ pub struct FlusherConfig { #[allow(clippy::await_holding_lock)] impl Flusher { pub fn new(config: FlusherConfig) -> Self { - let dd_api = DdApi::new( - config.api_key, - config.metrics_intake_url_prefix, - config.https_proxy, - config.timeout, - config.retry_strategy, - ); Flusher { - dd_api, + api_key_factory: config.api_key_factory, + metrics_intake_url_prefix: config.metrics_intake_url_prefix, + https_proxy: config.https_proxy, + timeout: config.timeout, + retry_strategy: config.retry_strategy, aggregator: config.aggregator, + dd_api: None, + } + } + + async fn get_dd_api(&mut self) -> &DdApi { + if self.dd_api.is_none() { + let api_key = (self.api_key_factory)().await; + self.dd_api = Some(DdApi::new( + api_key, + self.metrics_intake_url_prefix.clone(), + self.https_proxy.clone(), + self.timeout, + self.retry_strategy.clone(), + )); } + + #[allow(clippy::expect_used)] + self.dd_api + .as_ref() + .expect("dd_api should be initialized by this point") } /// Flush metrics from the aggregator @@ -75,7 +102,9 @@ impl Flusher { let series_copy = series.clone(); let distributions_copy = distributions.clone(); - let dd_api_clone = self.dd_api.clone(); + let dd_api = self.get_dd_api().await; + + let dd_api_clone = dd_api.clone(); let series_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; @@ -93,7 +122,7 @@ impl Flusher { (failed, had_shipping_error) }); - let dd_api_clone = self.dd_api.clone(); + let dd_api_clone = dd_api.clone(); let distributions_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 641c3ebd..31181e20 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -7,7 +7,7 @@ use dogstatsd::{ constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{Flusher, FlusherConfig}, + flusher::{ApiKeyFactory, Flusher, FlusherConfig}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -40,8 +40,11 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; + let api_key_factory: ApiKeyFactory = + Arc::new(|| Box::pin(async move { "mock-api-key".to_string() })); + let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key: "mock-api-key".to_string(), + api_key_factory, aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None,