From 588612dc233cf40b7f66aaa272b9ce211bac3f88 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 9 Jul 2025 13:54:58 -0400 Subject: [PATCH 1/2] Do not make create_request() and get_headers() return Option Simplify logic in StatsFlusher Move api_key_factory out of TraceProcessor Move some code Avoid resolving key in trace api and proxy Apply to proxy flusher Resolve conflicts Make trace flusher resolve api key Fix Clippy lint Format Use SendData.set_api_key() Fix errors Improve comments --- bottlecap/Cargo.lock | 6 +++--- bottlecap/Cargo.toml | 2 +- bottlecap/src/bin/bottlecap/main.rs | 13 ++++-------- bottlecap/src/logs/flusher.rs | 6 ++++-- bottlecap/src/proxy/interceptor.rs | 1 - bottlecap/src/traces/proxy_flusher.rs | 12 ++++++----- bottlecap/src/traces/stats_flusher.rs | 17 ++++++++-------- bottlecap/src/traces/trace_agent.rs | 8 ++------ bottlecap/src/traces/trace_flusher.rs | 27 ++++++++++++++++++++++--- bottlecap/src/traces/trace_processor.rs | 8 ++------ 10 files changed, 56 insertions(+), 44 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 5f975dec6..d2b0c04ba 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -724,7 +724,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" +source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" dependencies = [ "reqwest", "rustls", @@ -1078,9 +1078,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" +source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b)", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 2cb2976ad..262057313 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false } datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 126932521..2aa46a66a 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -439,11 +439,7 @@ fn create_api_key_factory( let aws_config = Arc::clone(&aws_config); let aws_credentials = Arc::clone(&aws_credentials); - Box::pin(async move { - resolve_secrets(config, aws_config, aws_credentials) - .await - .expect("Failed to resolve API key") - }) + Box::pin(async move { resolve_secrets(config, aws_config, aws_credentials).await }) }))) } @@ -517,7 +513,7 @@ async fn extension_loop_active( trace_agent_shutdown_token, ) = start_trace_agent( config, - Arc::clone(&api_key_factory), + &api_key_factory, &tags_provider, Arc::clone(&invocation_processor), Arc::clone(&trace_aggregator), @@ -1038,7 +1034,7 @@ fn start_metrics_flushers( #[allow(clippy::type_complexity)] fn start_trace_agent( config: &Arc, - api_key_factory: Arc, + api_key_factory: &Arc, tags_provider: &Arc, invocation_processor: Arc>, trace_aggregator: Arc>, @@ -1064,6 +1060,7 @@ fn start_trace_agent( let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { aggregator: trace_aggregator.clone(), config: Arc::clone(config), + api_key_factory: Arc::clone(api_key_factory), }); let obfuscation_config = obfuscation_config::ObfuscationConfig { @@ -1077,7 +1074,6 @@ fn start_trace_agent( let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new(obfuscation_config), - api_key_factory: api_key_factory.clone(), }); // Proxy @@ -1098,7 +1094,6 @@ fn start_trace_agent( proxy_aggregator, invocation_processor, Arc::clone(tags_provider), - api_key_factory, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index a0e597abb..15d5cc3ce 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -42,7 +42,6 @@ impl Flusher { config: Arc, ) -> Self { let client = get_client(&config); - Flusher { client, endpoint, @@ -54,7 +53,10 @@ impl Flusher { } pub async fn flush(&self, batches: Option>>>) -> Vec { - let api_key = self.api_key_factory.get_api_key().await; + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing logs: Failed to resolve API key"); + return vec![]; + }; let mut set = JoinSet::new(); diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 6feee0d15..d7efeadf3 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -399,7 +399,6 @@ mod tests { let metrics_aggregator = Arc::new(Mutex::new( MetricsAggregator::new(EMPTY_TAGS, 1024).unwrap(), )); - let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), function_name: "arn:some-function".to_string(), diff --git a/bottlecap/src/traces/proxy_flusher.rs b/bottlecap/src/traces/proxy_flusher.rs index e6ce4be24..3b60ee2c1 100644 --- a/bottlecap/src/traces/proxy_flusher.rs +++ b/bottlecap/src/traces/proxy_flusher.rs @@ -2,10 +2,9 @@ use dogstatsd::api_key::ApiKeyFactory; use reqwest::header::HeaderMap; use std::{error::Error, sync::Arc}; use thiserror::Error as ThisError; -use tokio::{ - sync::{Mutex, OnceCell}, - task::JoinSet, -}; +use tokio::sync::OnceCell; +use tokio::{sync::Mutex, task::JoinSet}; + use tracing::{debug, error}; use crate::{ @@ -83,7 +82,10 @@ impl Flusher { &self, retry_requests: Option>, ) -> Option> { - let api_key = self.api_key_factory.get_api_key().await; + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flush in proxy flusher: Failed to resolve API key"); + return None; + }; let mut join_set = JoinSet::new(); let mut requests = Vec::::new(); diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index d75844276..266d9d465 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -60,16 +60,21 @@ impl StatsFlusher for ServerlessStatsFlusher { return; } + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing stats: Failed to resolve API key"); + return; + }; + + let api_key_clone = api_key.to_string(); let endpoint = self .endpoint .get_or_init({ move || async move { - let api_key = self.api_key_factory.get_api_key().await.to_string(); let stats_url = trace_stats_url(&self.config.site); Endpoint { url: hyper::Uri::from_str(&stats_url) .expect("can't make URI from stats url, exiting"), - api_key: Some(api_key.clone().into()), + api_key: Some(api_key_clone.into()), timeout_ms: self.config.flush_timeout * 1_000, test_token: None, } @@ -95,12 +100,8 @@ impl StatsFlusher for ServerlessStatsFlusher { let start = std::time::Instant::now(); - let resp = stats_utils::send_stats_payload( - serialized_stats_payload, - endpoint, - self.api_key_factory.get_api_key().await, - ) - .await; + let resp = + stats_utils::send_stats_payload(serialized_stats_payload, endpoint, api_key).await; let elapsed = start.elapsed(); debug!( "Stats request to {} took {}ms", diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 3b3863911..7a2880902 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -37,7 +37,6 @@ use crate::{ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; -use dogstatsd::api_key::ApiKeyFactory; const TRACE_AGENT_PORT: usize = 8126; @@ -86,7 +85,6 @@ pub struct StatsState { pub struct ProxyState { pub config: Arc, pub proxy_aggregator: Arc>, - pub api_key_factory: Arc, } pub struct TraceAgent { @@ -99,7 +97,6 @@ pub struct TraceAgent { invocation_processor: Arc>, shutdown_token: CancellationToken, tx: Sender, - api_key_factory: Arc, } #[derive(Clone, Copy)] @@ -120,7 +117,6 @@ impl TraceAgent { proxy_aggregator: Arc>, invocation_processor: Arc>, tags_provider: Arc, - api_key_factory: Arc, ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -147,7 +143,6 @@ impl TraceAgent { invocation_processor, tags_provider, tx: trace_tx, - api_key_factory, shutdown_token: CancellationToken::new(), } } @@ -207,7 +202,6 @@ impl TraceAgent { let proxy_state = ProxyState { config: Arc::clone(&self.config), proxy_aggregator: Arc::clone(&self.proxy_aggregator), - api_key_factory: Arc::clone(&self.api_key_factory), }; let trace_router = Router::new() @@ -395,6 +389,8 @@ impl TraceAgent { (StatusCode::OK, response_json.to_string()).into_response() } + #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_lines)] async fn handle_traces( config: Arc, request: Request, diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 699c3c6cc..6c1a6d7fd 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -10,13 +10,18 @@ use datadog_trace_utils::{ send_data::SendDataBuilder, trace_utils::{self, SendData}, }; +use dogstatsd::api_key::ApiKeyFactory; use crate::config::Config; use crate::traces::trace_aggregator::TraceAggregator; #[async_trait] pub trait TraceFlusher { - fn new(aggregator: Arc>, config: Arc) -> Self + fn new( + aggregator: Arc>, + config: Arc, + api_key_factory: Arc, + ) -> Self where Self: Sized; /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. @@ -34,15 +39,29 @@ pub trait TraceFlusher { pub struct ServerlessTraceFlusher { pub aggregator: Arc>, pub config: Arc, + pub api_key_factory: Arc, } #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { - fn new(aggregator: Arc>, config: Arc) -> Self { - ServerlessTraceFlusher { aggregator, config } + fn new( + aggregator: Arc>, + config: Arc, + api_key_factory: Arc, + ) -> Self { + ServerlessTraceFlusher { + aggregator, + config, + api_key_factory, + } } async fn flush(&self, failed_traces: Option>) -> Option> { + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing traces: Failed to resolve API key"); + return None; + }; + let mut failed_batch: Option> = None; if let Some(traces) = failed_traces { @@ -64,6 +83,8 @@ impl TraceFlusher for ServerlessTraceFlusher { while !trace_builders.is_empty() { let traces: Vec<_> = trace_builders .into_iter() + // Lazily set the API key + .map(|builder| builder.with_api_key(&api_key)) .map(SendDataBuilder::build) .collect(); if let Some(failed) = self.send(traces).await { diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 1322fe967..ca65bb405 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -20,7 +20,6 @@ use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection}; use ddcommon::Endpoint; -use dogstatsd::api_key::ApiKeyFactory; use std::str::FromStr; use std::sync::Arc; use tracing::error; @@ -31,7 +30,6 @@ use super::trace_aggregator::SendDataBuilderInfo; #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { pub obfuscation_config: Arc, - pub api_key_factory: Arc, } struct ChunkProcessor { @@ -164,11 +162,11 @@ impl TraceProcessor for ServerlessTraceProcessor { tracer_payload.tags.extend(tags.clone()); } } - let api_key = self.api_key_factory.get_api_key().await.to_string(); let endpoint = Endpoint { url: hyper::Uri::from_str(&config.apm_dd_url) .expect("can't parse trace intake URL, exiting"), - api_key: Some(api_key.into()), + // Will be set at flush time + api_key: None, timeout_ms: config.flush_timeout * 1_000, test_token: None, }; @@ -194,7 +192,6 @@ mod tests { }; use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use dogstatsd::api_key::ApiKeyFactory; use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG}; @@ -299,7 +296,6 @@ mod tests { }; let trace_processor = ServerlessTraceProcessor { - api_key_factory: Arc::new(ApiKeyFactory::new("test-api-key")), obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()), }; let config = create_test_config(); From 5ebd13539e9dbfa4195ff63d6a7e96acf05ce81d Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 16 Jul 2025 17:55:38 -0400 Subject: [PATCH 2/2] Fix clippy --- bottlecap/src/traces/trace_flusher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 6c1a6d7fd..8fb6a2b47 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -84,7 +84,7 @@ impl TraceFlusher for ServerlessTraceFlusher { let traces: Vec<_> = trace_builders .into_iter() // Lazily set the API key - .map(|builder| builder.with_api_key(&api_key)) + .map(|builder| builder.with_api_key(api_key)) .map(SendDataBuilder::build) .collect(); if let Some(failed) = self.send(traces).await {