From e783068ebf3ed358d6c9af72d60b76f8471e3f8b Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 10 Jul 2025 12:37:58 -0400 Subject: [PATCH 1/2] Revert "feat: Add helper functions to ApiKeyFactory struct (#24)" This reverts commit b29bba8b4178fc2089943fe28e853d529826888b. --- crates/datadog-serverless-compat/src/main.rs | 6 +- crates/dogstatsd/src/api_key.rs | 71 -------------------- crates/dogstatsd/src/flusher.rs | 20 +++--- crates/dogstatsd/src/lib.rs | 1 - crates/dogstatsd/tests/integration_test.rs | 8 +-- 5 files changed, 20 insertions(+), 86 deletions(-) delete mode 100644 crates/dogstatsd/src/api_key.rs diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 24f7f5fb..2112b6ad 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -27,7 +27,6 @@ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::Environment use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, - api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site}, dogstatsd::{DogStatsD, DogStatsDConfig}, @@ -204,7 +203,10 @@ async fn start_dogstatsd( Some(dd_api_key) => { #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), + api_key_factory: Arc::new(move || { + let api_key = dd_api_key.clone(); + Box::pin(async move { api_key }) + }), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs deleted file mode 100644 index 6a32c806..00000000 --- a/crates/dogstatsd/src/api_key.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; -use std::{future::Future, pin::Pin}; -use tokio::sync::OnceCell; - -pub type ApiKeyResolverFn = - Arc Pin + Send>> + Send + Sync>; - -#[derive(Clone)] -pub enum ApiKeyFactory { - Static(String), - Dynamic { - resolver_fn: ApiKeyResolverFn, - api_key: Arc>, - }, -} - -impl ApiKeyFactory { - /// Create a new `ApiKeyFactory` with a static API key. - pub fn new(api_key: &str) -> Self { - Self::Static(api_key.to_string()) - } - - /// Create a new `ApiKeyFactory` with a dynamic API key resolver function. - pub fn new_from_resolver(resolver_fn: ApiKeyResolverFn) -> Self { - Self::Dynamic { - resolver_fn, - api_key: Arc::new(OnceCell::new()), - } - } - - pub async fn get_api_key(&self) -> &str { - match self { - Self::Static(api_key) => api_key, - Self::Dynamic { - resolver_fn, - api_key, - } => { - api_key - .get_or_init(|| async { (resolver_fn)().await }) - .await - } - } - } -} - -impl Debug for ApiKeyFactory { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ApiKeyFactory") - } -} - -#[cfg(test)] -pub mod tests { - use super::*; - - #[tokio::test] - async fn test_new() { - let api_key_factory = ApiKeyFactory::new("mock-api-key"); - assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); - } - - #[tokio::test] - async fn test_new_from_resolver() { - let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || { - let api_key = "mock-api-key".to_string(); - Box::pin(async move { api_key }) - }))); - assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); - } -} diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 2947ad7b..5807b371 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -2,17 +2,21 @@ // SPDX-License-Identifier: Apache-2.0 use crate::aggregator::Aggregator; -use crate::api_key::ApiKeyFactory; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; +pub type ApiKeyFactory = + Arc Pin + Send>> + Send + Sync>; + #[derive(Clone)] pub struct Flusher { - // Allow accepting a future so the API key resolution is deferred until the flush happens - api_key_factory: Arc, + // Accept a future so the API key resolution is deferred until the flush happens + api_key_factory: ApiKeyFactory, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, https_proxy: Option, timeout: Duration, @@ -22,7 +26,7 @@ pub struct Flusher { } pub struct FlusherConfig { - pub api_key_factory: Arc, + pub api_key_factory: ApiKeyFactory, pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, @@ -34,7 +38,7 @@ pub struct FlusherConfig { impl Flusher { pub fn new(config: FlusherConfig) -> Self { Flusher { - api_key_factory: Arc::clone(&config.api_key_factory), + api_key_factory: config.api_key_factory, metrics_intake_url_prefix: config.metrics_intake_url_prefix, https_proxy: config.https_proxy, timeout: config.timeout, @@ -46,9 +50,9 @@ impl Flusher { async fn get_dd_api(&mut self) -> &DdApi { if self.dd_api.is_none() { - let api_key = self.api_key_factory.get_api_key().await; + let api_key = (self.api_key_factory)().await; self.dd_api = Some(DdApi::new( - api_key.to_string(), + api_key, self.metrics_intake_url_prefix.clone(), self.https_proxy.clone(), self.timeout, @@ -59,7 +63,7 @@ impl Flusher { #[allow(clippy::expect_used)] self.dd_api .as_ref() - .expect("dd_api should have been initialized") + .expect("dd_api should be initialized by this point") } /// Flush metrics from the aggregator diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index 59cbd5b9..968ee549 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -8,7 +8,6 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod aggregator; -pub mod api_key; pub mod constants; pub mod datadog; pub mod dogstatsd; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 10fbc783..19b84aa6 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -4,11 +4,10 @@ use dogstatsd::metric::SortedTags; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, - api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{Flusher, FlusherConfig}, + flusher::{ApiKeyFactory, Flusher, FlusherConfig}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -40,10 +39,11 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; - let api_key_factory = ApiKeyFactory::new("mock-api-key"); + let api_key_factory: ApiKeyFactory = + Arc::new(|| Box::pin(async move { "mock-api-key".to_string() })); let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory: Arc::new(api_key_factory), + api_key_factory, aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None, From 137180d8f81990298891fd7c0b0fc06cd6478cf1 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 10 Jul 2025 12:38:09 -0400 Subject: [PATCH 2/2] Revert "feat: Defer DD API key resolution to flushing time (#21)" This reverts commit fb0a663c66e5b8fd29648fcbdd4424cccac011ea. --- crates/datadog-serverless-compat/src/main.rs | 5 +- crates/dogstatsd/src/flusher.rs | 53 +++++--------------- crates/dogstatsd/tests/integration_test.rs | 7 +-- 3 files changed, 15 insertions(+), 50 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 2112b6ad..270d03a5 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -203,10 +203,7 @@ async fn start_dogstatsd( Some(dd_api_key) => { #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory: Arc::new(move || { - let api_key = dd_api_key.clone(); - Box::pin(async move { api_key }) - }), + api_key: dd_api_key, aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 5807b371..b4bc771c 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -4,29 +4,18 @@ use crate::aggregator::Aggregator; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; -use std::future::Future; -use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; -pub type ApiKeyFactory = - Arc Pin + Send>> + Send + Sync>; - #[derive(Clone)] pub struct Flusher { - // Accept a future so the API key resolution is deferred until the flush happens - api_key_factory: ApiKeyFactory, - metrics_intake_url_prefix: MetricsIntakeUrlPrefix, - https_proxy: Option, - timeout: Duration, - retry_strategy: RetryStrategy, + dd_api: DdApi, aggregator: Arc>, - dd_api: Option, } pub struct FlusherConfig { - pub api_key_factory: ApiKeyFactory, + pub api_key: String, pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, @@ -37,33 +26,17 @@ pub struct FlusherConfig { #[allow(clippy::await_holding_lock)] impl Flusher { pub fn new(config: FlusherConfig) -> Self { + let dd_api = DdApi::new( + config.api_key, + config.metrics_intake_url_prefix, + config.https_proxy, + config.timeout, + config.retry_strategy, + ); Flusher { - api_key_factory: config.api_key_factory, - metrics_intake_url_prefix: config.metrics_intake_url_prefix, - https_proxy: config.https_proxy, - timeout: config.timeout, - retry_strategy: config.retry_strategy, + dd_api, aggregator: config.aggregator, - dd_api: None, - } - } - - async fn get_dd_api(&mut self) -> &DdApi { - if self.dd_api.is_none() { - let api_key = (self.api_key_factory)().await; - self.dd_api = Some(DdApi::new( - api_key, - self.metrics_intake_url_prefix.clone(), - self.https_proxy.clone(), - self.timeout, - self.retry_strategy.clone(), - )); } - - #[allow(clippy::expect_used)] - self.dd_api - .as_ref() - .expect("dd_api should be initialized by this point") } /// Flush metrics from the aggregator @@ -102,9 +75,7 @@ impl Flusher { let series_copy = series.clone(); let distributions_copy = distributions.clone(); - let dd_api = self.get_dd_api().await; - - let dd_api_clone = dd_api.clone(); + let dd_api_clone = self.dd_api.clone(); let series_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; @@ -122,7 +93,7 @@ impl Flusher { (failed, had_shipping_error) }); - let dd_api_clone = dd_api.clone(); + let dd_api_clone = self.dd_api.clone(); let distributions_handle = tokio::spawn(async move { let mut failed = Vec::new(); let mut had_shipping_error = false; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 19b84aa6..2ddbed1a 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -7,7 +7,7 @@ use dogstatsd::{ constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{ApiKeyFactory, Flusher, FlusherConfig}, + flusher::{Flusher, FlusherConfig}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -39,11 +39,8 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; - let api_key_factory: ApiKeyFactory = - Arc::new(|| Box::pin(async move { "mock-api-key".to_string() })); - let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory, + api_key: "mock-api-key".to_string(), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None,