Skip to content
6 changes: 2 additions & 4 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::Environment

use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site},
dogstatsd::{DogStatsD, DogStatsDConfig},
Expand Down Expand Up @@ -203,10 +204,7 @@ async fn start_dogstatsd(
Some(dd_api_key) => {
#[allow(clippy::expect_used)]
let metrics_flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(move || {
let api_key = dd_api_key.clone();
Box::pin(async move { api_key })
}),
api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)),
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
Some(Site::new(dd_site).expect("Failed to parse site")),
Expand Down
71 changes: 71 additions & 0 deletions crates/dogstatsd/src/api_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::fmt::Debug;
use std::sync::Arc;
use std::{future::Future, pin::Pin};
use tokio::sync::OnceCell;

pub type ApiKeyResolverFn =
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;

#[derive(Clone)]
pub enum ApiKeyFactory {
Static(String),
Dynamic {
resolver_fn: ApiKeyResolverFn,
api_key: Arc<OnceCell<String>>,
},
}

impl ApiKeyFactory {
/// Create a new `ApiKeyFactory` with a static API key.
pub fn new(api_key: &str) -> Self {
Self::Static(api_key.to_string())
}

/// Create a new `ApiKeyFactory` with a dynamic API key resolver function.
pub fn new_from_resolver(resolver_fn: ApiKeyResolverFn) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some docs on what is considered a resolver

Self::Dynamic {
resolver_fn,
api_key: Arc::new(OnceCell::new()),
}
}

pub async fn get_api_key(&self) -> &str {
match self {
Self::Static(api_key) => api_key,
Self::Dynamic {
resolver_fn,
api_key,
} => {
api_key
.get_or_init(|| async { (resolver_fn)().await })
.await
}
}
}
}

impl Debug for ApiKeyFactory {
Comment thread
duncanista marked this conversation as resolved.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ApiKeyFactory")
}
}

#[cfg(test)]
pub mod tests {
use super::*;

#[tokio::test]
async fn test_new() {
let api_key_factory = ApiKeyFactory::new("mock-api-key");
assert_eq!(api_key_factory.get_api_key().await, "mock-api-key");
}

#[tokio::test]
async fn test_new_from_resolver() {
let api_key_factory = Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || {
let api_key = "mock-api-key".to_string();
Box::pin(async move { api_key })
})));
assert_eq!(api_key_factory.get_api_key().await, "mock-api-key");
}
}
20 changes: 8 additions & 12 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@
// SPDX-License-Identifier: Apache-2.0

use crate::aggregator::Aggregator;
use crate::api_key::ApiKeyFactory;
use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy};
use reqwest::{Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, error};

pub type ApiKeyFactory =
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;

#[derive(Clone)]
pub struct Flusher {
// Accept a future so the API key resolution is deferred until the flush happens
api_key_factory: ApiKeyFactory,
// Allow accepting a future so the API key resolution is deferred until the flush happens
Copy link

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Update this comment to reflect that api_key_factory now uses ApiKeyFactory rather than accepting a raw future-producing closure.

Suggested change
// Allow accepting a future so the API key resolution is deferred until the flush happens
// Uses ApiKeyFactory to defer API key resolution until the flush happens

Copilot uses AI. Check for mistakes.
api_key_factory: Arc<ApiKeyFactory>,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
timeout: Duration,
Expand All @@ -26,7 +22,7 @@ pub struct Flusher {
}

pub struct FlusherConfig {
pub api_key_factory: ApiKeyFactory,
pub api_key_factory: Arc<ApiKeyFactory>,
pub aggregator: Arc<Mutex<Aggregator>>,
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
pub https_proxy: Option<String>,
Expand All @@ -38,7 +34,7 @@ pub struct FlusherConfig {
impl Flusher {
pub fn new(config: FlusherConfig) -> Self {
Flusher {
api_key_factory: config.api_key_factory,
api_key_factory: Arc::clone(&config.api_key_factory),
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
https_proxy: config.https_proxy,
timeout: config.timeout,
Expand All @@ -50,9 +46,9 @@ impl Flusher {

async fn get_dd_api(&mut self) -> &DdApi {
if self.dd_api.is_none() {
let api_key = (self.api_key_factory)().await;
let api_key = self.api_key_factory.get_api_key().await;
Copy link

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Since get_api_key returns a &str, you then call .to_string() when building DdApi, causing an extra allocation; consider returning an owned String to avoid the clone.

Copilot uses AI. Check for mistakes.
self.dd_api = Some(DdApi::new(
api_key,
api_key.to_string(),
self.metrics_intake_url_prefix.clone(),
self.https_proxy.clone(),
self.timeout,
Expand All @@ -63,7 +59,7 @@ impl Flusher {
#[allow(clippy::expect_used)]
self.dd_api
.as_ref()
.expect("dd_api should be initialized by this point")
.expect("dd_api should have been initialized")
}

/// Flush metrics from the aggregator
Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#![cfg_attr(not(test), deny(clippy::unimplemented))]

pub mod aggregator;
pub mod api_key;
pub mod constants;
pub mod datadog;
pub mod dogstatsd;
Expand Down
8 changes: 4 additions & 4 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
use dogstatsd::metric::SortedTags;
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride},
dogstatsd::{DogStatsD, DogStatsDConfig},
flusher::{ApiKeyFactory, Flusher, FlusherConfig},
flusher::{Flusher, FlusherConfig},
};
use mockito::Server;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -40,11 +41,10 @@ async fn dogstatsd_server_ships_series() {

let _ = start_dogstatsd(&metrics_aggr).await;

let api_key_factory: ApiKeyFactory =
Arc::new(|| Box::pin(async move { "mock-api-key".to_string() }));
let api_key_factory = ApiKeyFactory::new("mock-api-key");

let mut metrics_flusher = Flusher::new(FlusherConfig {
api_key_factory,
api_key_factory: Arc::new(api_key_factory),
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
Expand Down
Loading