diff --git a/crates/dogstatsd/src/api_key.rs b/crates/dogstatsd/src/api_key.rs index 6a32c806..06118e02 100644 --- a/crates/dogstatsd/src/api_key.rs +++ b/crates/dogstatsd/src/api_key.rs @@ -4,14 +4,14 @@ use std::{future::Future, pin::Pin}; use tokio::sync::OnceCell; pub type ApiKeyResolverFn = - Arc Pin + Send>> + Send + Sync>; + Arc Pin> + Send>> + Send + Sync>; #[derive(Clone)] pub enum ApiKeyFactory { Static(String), Dynamic { resolver_fn: ApiKeyResolverFn, - api_key: Arc>, + api_key: Arc>>, }, } @@ -29,17 +29,17 @@ impl ApiKeyFactory { } } - pub async fn get_api_key(&self) -> &str { + pub async fn get_api_key(&self) -> Option<&str> { match self { - Self::Static(api_key) => api_key, + Self::Static(api_key) => Some(api_key), Self::Dynamic { resolver_fn, api_key, - } => { - api_key - .get_or_init(|| async { (resolver_fn)().await }) - .await - } + } => api_key + .get_or_init(|| async { (resolver_fn)().await }) + .await + .as_ref() + .map(|s| s.as_str()), } } } @@ -57,15 +57,15 @@ pub mod tests { #[tokio::test] async fn test_new() { let api_key_factory = ApiKeyFactory::new("mock-api-key"); - assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); + assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key")); } #[tokio::test] async fn test_new_from_resolver() { let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || { let api_key = "mock-api-key".to_string(); - Box::pin(async move { api_key }) + Box::pin(async move { Some(api_key) }) }))); - assert_eq!(api_key_factory.get_api_key().await, "mock-api-key"); + assert_eq!(api_key_factory.get_api_key().await, Some("mock-api-key")); } } diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index 2947ad7b..a66d8434 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -7,6 +7,7 @@ use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::sync::OnceCell; use tracing::{debug, error}; #[derive(Clone)] @@ -18,7 +19,7 @@ pub struct Flusher { timeout: Duration, retry_strategy: RetryStrategy, aggregator: Arc>, - dd_api: Option, + dd_api: OnceCell>, } pub struct FlusherConfig { @@ -40,26 +41,29 @@ impl Flusher { timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator: config.aggregator, - dd_api: None, + dd_api: OnceCell::new(), } } - async fn get_dd_api(&mut self) -> &DdApi { - if self.dd_api.is_none() { - let api_key = self.api_key_factory.get_api_key().await; - self.dd_api = Some(DdApi::new( - api_key.to_string(), - self.metrics_intake_url_prefix.clone(), - self.https_proxy.clone(), - self.timeout, - self.retry_strategy.clone(), - )); - } - - #[allow(clippy::expect_used)] + async fn get_dd_api(&mut self) -> &Option { self.dd_api - .as_ref() - .expect("dd_api should have been initialized") + .get_or_init(|| async { + let api_key = self.api_key_factory.get_api_key().await; + match api_key { + Some(api_key) => Some(DdApi::new( + api_key.to_string(), + self.metrics_intake_url_prefix.clone(), + self.https_proxy.clone(), + self.timeout, + self.retry_strategy.clone(), + )), + None => { + error!("Failed to create dd_api: failed to get API key"); + None + } + } + }) + .await } /// Flush metrics from the aggregator @@ -98,7 +102,13 @@ impl Flusher { let series_copy = series.clone(); let distributions_copy = distributions.clone(); - let dd_api = self.get_dd_api().await; + let dd_api = match self.get_dd_api().await { + None => { + error!("Failed to flush metrics: failed to create dd_api"); + return Some((series_copy, distributions_copy)); + } + Some(dd_api) => dd_api, + }; let dd_api_clone = dd_api.clone(); let series_handle = tokio::spawn(async move {