From f1c37e99622c9e758d1a9a1e6bb6bb6f927e7206 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 11 Jun 2025 13:16:18 -0400 Subject: [PATCH 1/9] feat: Lazily resolve DD API key in dogstatsd --- crates/dogstatsd/src/flusher.rs | 66 ++++++++++++++++------ crates/dogstatsd/tests/integration_test.rs | 10 +++- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index b4bc771c..53fa8cf2 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -6,16 +6,27 @@ use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::pin::Pin; +use std::future::Future; use tracing::{debug, error}; +use std::sync::OnceLock; + +pub type ApiKeyFactory = Arc Pin + Send>> + Send + Sync>; #[derive(Clone)] pub struct Flusher { - dd_api: DdApi, + // Accept a future so the API keyresolution is deferred until the flush happens + api_key_factory: ApiKeyFactory, + metrics_intake_url_prefix: MetricsIntakeUrlPrefix, + https_proxy: Option, + timeout: Duration, + retry_strategy: RetryStrategy, aggregator: Arc>, + dd_api: Arc>, } pub struct FlusherConfig { - pub api_key: String, + pub api_key_factory: ApiKeyFactory, pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, @@ -26,17 +37,36 @@ pub struct FlusherConfig { #[allow(clippy::await_holding_lock)] impl Flusher { pub fn new(config: FlusherConfig) -> Self { - let dd_api = DdApi::new( - config.api_key, - config.metrics_intake_url_prefix, - config.https_proxy, - config.timeout, - config.retry_strategy, - ); Flusher { - dd_api, + api_key_factory: config.api_key_factory, + metrics_intake_url_prefix: config.metrics_intake_url_prefix, + https_proxy: config.https_proxy, + timeout: config.timeout, + retry_strategy: config.retry_strategy, aggregator: config.aggregator, + dd_api: Arc::new(OnceLock::new()), + } + } + + async fn get_dd_api(&mut self) -> &DdApi { + if let Some(dd_api) = self.dd_api.get() { + return dd_api; } + + let api_key = (self.api_key_factory)().await; + + // Initialize the OnceLock with a new DdApi instance + // If another thread initialized it while we were getting the API key, + // we'll get that instance instead + self.dd_api.get_or_init(|| { + DdApi::new( + api_key, + self.metrics_intake_url_prefix.clone(), + self.https_proxy.clone(), + self.timeout, + self.retry_strategy.clone(), + ) + }) } /// Flush metrics from the aggregator @@ -75,13 +105,16 @@ impl Flusher { let series_copy = series.clone(); let distributions_copy = distributions.clone(); - let dd_api_clone = self.dd_api.clone(); + let dd_api = self.get_dd_api().await; + + let dd_api_clone = dd_api.clone(); let series_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; for a_batch in series { - let (continue_shipping, should_retry) = - should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; + let (continue_shipping, should_retry) = { + should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await + }; if should_retry { failed.push(a_batch); had_shipping_error = true; @@ -93,13 +126,14 @@ impl Flusher { (failed, had_shipping_error) }); - let dd_api_clone = self.dd_api.clone(); + let dd_api_clone = dd_api.clone(); let distributions_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; for a_batch in distributions { - let (continue_shipping, should_retry) = - should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; + let (continue_shipping, should_retry) = { + should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await + }; if should_retry { failed.push(a_batch); had_shipping_error = true; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 641c3ebd..20e7c3b1 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -7,7 +7,7 @@ use dogstatsd::{ constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{Flusher, FlusherConfig}, + flusher::{Flusher, FlusherConfig, ApiKeyFactory}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -40,8 +40,14 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; + let api_key_factory: ApiKeyFactory = Arc::new(|| { + Box::pin(async move { + "mock-api-key".to_string() + }) + }); + let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key: "mock-api-key".to_string(), + api_key_factory, aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None, From e2224cebc0ec3796a509b2fc353ad5c0d671fc79 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 11 Jun 2025 14:26:54 -0400 Subject: [PATCH 2/9] Update datadog-serverless-compat --- crates/datadog-serverless-compat/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 270d03a5..6ded1769 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -203,7 +203,10 @@ async fn start_dogstatsd( Some(dd_api_key) => { #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { - api_key: dd_api_key, + api_key_factory: Arc::new(move || { + let key = dd_api_key.clone(); + Box::pin(async move { key }) + }), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), From aa5639d9eec0fef9a8902609c3987859345499ab Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 11 Jun 2025 14:43:31 -0400 Subject: [PATCH 3/9] Run cargo fmt --- crates/dogstatsd/src/flusher.rs | 18 +++++++++--------- crates/dogstatsd/tests/integration_test.rs | 9 +++------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 53fa8cf2..b24f47e7 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -4,14 +4,15 @@ use crate::aggregator::Aggregator; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; +use std::future::Future; +use std::pin::Pin; +use std::sync::OnceLock; use std::sync::{Arc, Mutex}; use std::time::Duration; -use std::pin::Pin; -use std::future::Future; use tracing::{debug, error}; -use std::sync::OnceLock; -pub type ApiKeyFactory = Arc Pin + Send>> + Send + Sync>; +pub type ApiKeyFactory = + Arc Pin + Send>> + Send + Sync>; #[derive(Clone)] pub struct Flusher { @@ -52,9 +53,9 @@ impl Flusher { if let Some(dd_api) = self.dd_api.get() { return dd_api; } - + let api_key = (self.api_key_factory)().await; - + // Initialize the OnceLock with a new DdApi instance // If another thread initialized it while we were getting the API key, // we'll get that instance instead @@ -112,9 +113,8 @@ impl Flusher { let mut failed = Vec::new(); let mut had_shipping_error = false; for a_batch in series { - let (continue_shipping, should_retry) = { - should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await - }; + let (continue_shipping, should_retry) = + { should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await }; if should_retry { failed.push(a_batch); had_shipping_error = true; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 20e7c3b1..31181e20 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -7,7 +7,7 @@ use dogstatsd::{ constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{Flusher, FlusherConfig, ApiKeyFactory}, + flusher::{ApiKeyFactory, Flusher, FlusherConfig}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -40,11 +40,8 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; - let api_key_factory: ApiKeyFactory = Arc::new(|| { - Box::pin(async move { - "mock-api-key".to_string() - }) - }); + let api_key_factory: ApiKeyFactory = + Arc::new(|| Box::pin(async move { "mock-api-key".to_string() })); let mut metrics_flusher = Flusher::new(FlusherConfig { api_key_factory, From f4711eaa0140196595b7826c9a0d983cf3a04283 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:06:12 -0400 Subject: [PATCH 4/9] Use Option --- crates/dogstatsd/src/flusher.rs | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index b24f47e7..9337c2be 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -6,7 +6,6 @@ use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; use std::future::Future; use std::pin::Pin; -use std::sync::OnceLock; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; @@ -23,7 +22,7 @@ pub struct Flusher { timeout: Duration, retry_strategy: RetryStrategy, aggregator: Arc>, - dd_api: Arc>, + dd_api: Option, } pub struct FlusherConfig { @@ -45,29 +44,23 @@ impl Flusher { timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator: config.aggregator, - dd_api: Arc::new(OnceLock::new()), + dd_api: None, } } async fn get_dd_api(&mut self) -> &DdApi { - if let Some(dd_api) = self.dd_api.get() { - return dd_api; - } - - let api_key = (self.api_key_factory)().await; - - // Initialize the OnceLock with a new DdApi instance - // If another thread initialized it while we were getting the API key, - // we'll get that instance instead - self.dd_api.get_or_init(|| { - DdApi::new( + if self.dd_api.is_none() { + let api_key = (self.api_key_factory)().await; + self.dd_api = Some(DdApi::new( api_key, self.metrics_intake_url_prefix.clone(), self.https_proxy.clone(), self.timeout, self.retry_strategy.clone(), - ) - }) + )); + } + + self.dd_api.as_ref().unwrap() } /// Flush metrics from the aggregator From a71b96b1c4f2afb8dd7564f0941625770c2f7d78 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:07:52 -0400 Subject: [PATCH 5/9] Remove extra curly brackets --- crates/dogstatsd/src/flusher.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 9337c2be..d4de1455 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -107,7 +107,7 @@ impl Flusher { let mut had_shipping_error = false; for a_batch in series { let (continue_shipping, should_retry) = - { should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await }; + should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; if should_retry { failed.push(a_batch); had_shipping_error = true; @@ -124,9 +124,8 @@ impl Flusher { let mut failed = Vec::new(); let mut had_shipping_error = false; for a_batch in distributions { - let (continue_shipping, should_retry) = { - should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await - }; + let (continue_shipping, should_retry) = + should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; if should_retry { failed.push(a_batch); had_shipping_error = true; From 45e3e3ed12d8fb6c9561f7896a1e7b1558ed5f60 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:13:51 -0400 Subject: [PATCH 6/9] Rename a variable --- crates/datadog-serverless-compat/src/main.rs | 4 ++-- crates/dogstatsd/src/flusher.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 6ded1769..2112b6ad 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -204,8 +204,8 @@ async fn start_dogstatsd( #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { api_key_factory: Arc::new(move || { - let key = dd_api_key.clone(); - Box::pin(async move { key }) + let api_key = dd_api_key.clone(); + Box::pin(async move { api_key }) }), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index d4de1455..54427070 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -15,7 +15,7 @@ pub type ApiKeyFactory = #[derive(Clone)] pub struct Flusher { - // Accept a future so the API keyresolution is deferred until the flush happens + // Accept a future so the API key resolution is deferred until the flush happens api_key_factory: ApiKeyFactory, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, https_proxy: Option, From 00465c261b031ed39df124b9babdd58a811497fe Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:24:59 -0400 Subject: [PATCH 7/9] Fix clippy error --- crates/dogstatsd/src/flusher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 54427070..f26b90ad 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -60,7 +60,7 @@ impl Flusher { )); } - self.dd_api.as_ref().unwrap() + self.dd_api.as_ref().expect("dd_api should be initialized by this point") } /// Flush metrics from the aggregator From b04e998e7c471e55ad3efd5ba3b7b8b1aaaa2ad2 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:26:57 -0400 Subject: [PATCH 8/9] Cargo fmt --- crates/dogstatsd/src/flusher.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index f26b90ad..9234121c 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -60,7 +60,9 @@ impl Flusher { )); } - self.dd_api.as_ref().expect("dd_api should be initialized by this point") + self.dd_api + .as_ref() + .expect("dd_api should be initialized by this point") } /// Flush metrics from the aggregator From 0d14da36899015cee07cc84d0aaeb2ec91978fcc Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 12 Jun 2025 17:32:28 -0400 Subject: [PATCH 9/9] Add clippy allow expect_used --- crates/dogstatsd/src/flusher.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 9234121c..5807b371 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -60,6 +60,7 @@ impl Flusher { )); } + #[allow(clippy::expect_used)] self.dd_api .as_ref() .expect("dd_api should be initialized by this point")