diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 2112b6ad..24f7f5fb 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -27,6 +27,7 @@ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::Environment use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, + api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site}, dogstatsd::{DogStatsD, DogStatsDConfig}, @@ -203,10 +204,7 @@ async fn start_dogstatsd( Some(dd_api_key) => { #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory: Arc::new(move || { - let api_key = dd_api_key.clone(); - Box::pin(async move { api_key }) - }), + api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs new file mode 100644 index 00000000..6a32c806 --- /dev/null +++ b/crates/dogstatsd/src/api_key.rs @@ -0,0 +1,71 @@ +use std::fmt::Debug; +use std::sync::Arc; +use std::{future::Future, pin::Pin}; +use tokio::sync::OnceCell; + +pub type ApiKeyResolverFn = + Arc Pin + Send>> + Send + Sync>; + +#[derive(Clone)] +pub enum ApiKeyFactory { + Static(String), + Dynamic { + resolver_fn: ApiKeyResolverFn, + api_key: Arc>, + }, +} + +impl ApiKeyFactory { + /// Create a new `ApiKeyFactory` with a static API key. + pub fn new(api_key: &str) -> Self { + Self::Static(api_key.to_string()) + } + + /// Create a new `ApiKeyFactory` with a dynamic API key resolver function. + pub fn new_from_resolver(resolver_fn: ApiKeyResolverFn) -> Self { + Self::Dynamic { + resolver_fn, + api_key: Arc::new(OnceCell::new()), + } + } + + pub async fn get_api_key(&self) -> &str { + match self { + Self::Static(api_key) => api_key, + Self::Dynamic { + resolver_fn, + api_key, + } => { + api_key + .get_or_init(|| async { (resolver_fn)().await }) + .await + } + } + } +} + +impl Debug for ApiKeyFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ApiKeyFactory") + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[tokio::test] + async fn test_new() { + let api_key_factory = ApiKeyFactory::new("mock-api-key"); + assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); + } + + #[tokio::test] + async fn test_new_from_resolver() { + let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || { + let api_key = "mock-api-key".to_string(); + Box::pin(async move { api_key }) + }))); + assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); + } +} diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 5807b371..2947ad7b 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -2,21 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 use crate::aggregator::Aggregator; +use crate::api_key::ApiKeyFactory; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; -use std::future::Future; -use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, error}; -pub type ApiKeyFactory = - Arc Pin + Send>> + Send + Sync>; - #[derive(Clone)] pub struct Flusher { - // Accept a future so the API key resolution is deferred until the flush happens - api_key_factory: ApiKeyFactory, + // Allow accepting a future so the API key resolution is deferred until the flush happens + api_key_factory: Arc, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, https_proxy: Option, timeout: Duration, @@ -26,7 +22,7 @@ pub struct Flusher { } pub struct FlusherConfig { - pub api_key_factory: ApiKeyFactory, + pub api_key_factory: Arc, pub aggregator: Arc>, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, @@ -38,7 +34,7 @@ pub struct FlusherConfig { impl Flusher { pub fn new(config: FlusherConfig) -> Self { Flusher { - api_key_factory: config.api_key_factory, + api_key_factory: Arc::clone(&config.api_key_factory), metrics_intake_url_prefix: config.metrics_intake_url_prefix, https_proxy: config.https_proxy, timeout: config.timeout, @@ -50,9 +46,9 @@ impl Flusher { async fn get_dd_api(&mut self) -> &DdApi { if self.dd_api.is_none() { - let api_key = (self.api_key_factory)().await; + let api_key = self.api_key_factory.get_api_key().await; self.dd_api = Some(DdApi::new( - api_key, + api_key.to_string(), self.metrics_intake_url_prefix.clone(), self.https_proxy.clone(), self.timeout, @@ -63,7 +59,7 @@ impl Flusher { #[allow(clippy::expect_used)] self.dd_api .as_ref() - .expect("dd_api should be initialized by this point") + .expect("dd_api should have been initialized") } /// Flush metrics from the aggregator diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index 4009db14..6aea675b 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -8,6 +8,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod aggregator; +pub mod api_key; pub mod constants; pub mod datadog; pub mod dogstatsd; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 31181e20..4a7f30cf 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -4,10 +4,11 @@ use dogstatsd::metric::SortedTags; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, + api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, dogstatsd::{DogStatsD, DogStatsDConfig}, - flusher::{ApiKeyFactory, Flusher, FlusherConfig}, + flusher::{Flusher, FlusherConfig}, }; use mockito::Server; use std::sync::{Arc, Mutex}; @@ -40,11 +41,10 @@ async fn dogstatsd_server_ships_series() { let _ = start_dogstatsd(&metrics_aggr).await; - let api_key_factory: ApiKeyFactory = - Arc::new(|| Box::pin(async move { "mock-api-key".to_string() })); + let api_key_factory = ApiKeyFactory::new("mock-api-key"); let mut metrics_flusher = Flusher::new(FlusherConfig { - api_key_factory, + api_key_factory: Arc::new(api_key_factory), aggregator: Arc::clone(&metrics_aggr), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None,