diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 5f975dec6..d2b0c04ba 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -724,7 +724,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" +source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" dependencies = [ "reqwest", "rustls", @@ -1078,9 +1078,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" +source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b)", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 2cb2976ad..262057313 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false } datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 126932521..2aa46a66a 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -439,11 +439,7 @@ fn create_api_key_factory( let aws_config = Arc::clone(&aws_config); let aws_credentials = Arc::clone(&aws_credentials); - Box::pin(async move { - resolve_secrets(config, aws_config, aws_credentials) - .await - .expect("Failed to resolve API key") - }) + Box::pin(async move { resolve_secrets(config, aws_config, aws_credentials).await }) }))) } @@ -517,7 +513,7 @@ async fn extension_loop_active( trace_agent_shutdown_token, ) = start_trace_agent( config, - Arc::clone(&api_key_factory), + &api_key_factory, &tags_provider, Arc::clone(&invocation_processor), Arc::clone(&trace_aggregator), @@ -1038,7 +1034,7 @@ fn start_metrics_flushers( #[allow(clippy::type_complexity)] fn start_trace_agent( config: &Arc, - api_key_factory: Arc, + api_key_factory: &Arc, tags_provider: &Arc, invocation_processor: Arc>, trace_aggregator: Arc>, @@ -1064,6 +1060,7 @@ fn start_trace_agent( let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { aggregator: trace_aggregator.clone(), config: Arc::clone(config), + api_key_factory: Arc::clone(api_key_factory), }); let obfuscation_config = obfuscation_config::ObfuscationConfig { @@ -1077,7 +1074,6 @@ fn start_trace_agent( let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new(obfuscation_config), - api_key_factory: api_key_factory.clone(), }); // Proxy @@ -1098,7 +1094,6 @@ fn start_trace_agent( proxy_aggregator, invocation_processor, Arc::clone(tags_provider), - api_key_factory, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index a0e597abb..15d5cc3ce 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -42,7 +42,6 @@ impl Flusher { config: Arc, ) -> Self { let client = get_client(&config); - Flusher { client, endpoint, @@ -54,7 +53,10 @@ impl Flusher { } pub async fn flush(&self, batches: Option>>>) -> Vec { - let api_key = self.api_key_factory.get_api_key().await; + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing logs: Failed to resolve API key"); + return vec![]; + }; let mut set = JoinSet::new(); diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 6feee0d15..d7efeadf3 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -399,7 +399,6 @@ mod tests { let metrics_aggregator = Arc::new(Mutex::new( MetricsAggregator::new(EMPTY_TAGS, 1024).unwrap(), )); - let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), function_name: "arn:some-function".to_string(), diff --git a/bottlecap/src/traces/proxy_flusher.rs b/bottlecap/src/traces/proxy_flusher.rs index e6ce4be24..3b60ee2c1 100644 --- a/bottlecap/src/traces/proxy_flusher.rs +++ b/bottlecap/src/traces/proxy_flusher.rs @@ -2,10 +2,9 @@ use dogstatsd::api_key::ApiKeyFactory; use reqwest::header::HeaderMap; use std::{error::Error, sync::Arc}; use thiserror::Error as ThisError; -use tokio::{ - sync::{Mutex, OnceCell}, - task::JoinSet, -}; +use tokio::sync::OnceCell; +use tokio::{sync::Mutex, task::JoinSet}; + use tracing::{debug, error}; use crate::{ @@ -83,7 +82,10 @@ impl Flusher { &self, retry_requests: Option>, ) -> Option> { - let api_key = self.api_key_factory.get_api_key().await; + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flush in proxy flusher: Failed to resolve API key"); + return None; + }; let mut join_set = JoinSet::new(); let mut requests = Vec::::new(); diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index d75844276..266d9d465 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -60,16 +60,21 @@ impl StatsFlusher for ServerlessStatsFlusher { return; } + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing stats: Failed to resolve API key"); + return; + }; + + let api_key_clone = api_key.to_string(); let endpoint = self .endpoint .get_or_init({ move || async move { - let api_key = self.api_key_factory.get_api_key().await.to_string(); let stats_url = trace_stats_url(&self.config.site); Endpoint { url: hyper::Uri::from_str(&stats_url) .expect("can't make URI from stats url, exiting"), - api_key: Some(api_key.clone().into()), + api_key: Some(api_key_clone.into()), timeout_ms: self.config.flush_timeout * 1_000, test_token: None, } @@ -95,12 +100,8 @@ impl StatsFlusher for ServerlessStatsFlusher { let start = std::time::Instant::now(); - let resp = stats_utils::send_stats_payload( - serialized_stats_payload, - endpoint, - self.api_key_factory.get_api_key().await, - ) - .await; + let resp = + stats_utils::send_stats_payload(serialized_stats_payload, endpoint, api_key).await; let elapsed = start.elapsed(); debug!( "Stats request to {} took {}ms", diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 3b3863911..7a2880902 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -37,7 +37,6 @@ use crate::{ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; -use dogstatsd::api_key::ApiKeyFactory; const TRACE_AGENT_PORT: usize = 8126; @@ -86,7 +85,6 @@ pub struct StatsState { pub struct ProxyState { pub config: Arc, pub proxy_aggregator: Arc>, - pub api_key_factory: Arc, } pub struct TraceAgent { @@ -99,7 +97,6 @@ pub struct TraceAgent { invocation_processor: Arc>, shutdown_token: CancellationToken, tx: Sender, - api_key_factory: Arc, } #[derive(Clone, Copy)] @@ -120,7 +117,6 @@ impl TraceAgent { proxy_aggregator: Arc>, invocation_processor: Arc>, tags_provider: Arc, - api_key_factory: Arc, ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -147,7 +143,6 @@ impl TraceAgent { invocation_processor, tags_provider, tx: trace_tx, - api_key_factory, shutdown_token: CancellationToken::new(), } } @@ -207,7 +202,6 @@ impl TraceAgent { let proxy_state = ProxyState { config: Arc::clone(&self.config), proxy_aggregator: Arc::clone(&self.proxy_aggregator), - api_key_factory: Arc::clone(&self.api_key_factory), }; let trace_router = Router::new() @@ -395,6 +389,8 @@ impl TraceAgent { (StatusCode::OK, response_json.to_string()).into_response() } + #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_lines)] async fn handle_traces( config: Arc, request: Request, diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 699c3c6cc..8fb6a2b47 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -10,13 +10,18 @@ use datadog_trace_utils::{ send_data::SendDataBuilder, trace_utils::{self, SendData}, }; +use dogstatsd::api_key::ApiKeyFactory; use crate::config::Config; use crate::traces::trace_aggregator::TraceAggregator; #[async_trait] pub trait TraceFlusher { - fn new(aggregator: Arc>, config: Arc) -> Self + fn new( + aggregator: Arc>, + config: Arc, + api_key_factory: Arc, + ) -> Self where Self: Sized; /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. @@ -34,15 +39,29 @@ pub trait TraceFlusher { pub struct ServerlessTraceFlusher { pub aggregator: Arc>, pub config: Arc, + pub api_key_factory: Arc, } #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { - fn new(aggregator: Arc>, config: Arc) -> Self { - ServerlessTraceFlusher { aggregator, config } + fn new( + aggregator: Arc>, + config: Arc, + api_key_factory: Arc, + ) -> Self { + ServerlessTraceFlusher { + aggregator, + config, + api_key_factory, + } } async fn flush(&self, failed_traces: Option>) -> Option> { + let Some(api_key) = self.api_key_factory.get_api_key().await else { + error!("Skipping flushing traces: Failed to resolve API key"); + return None; + }; + let mut failed_batch: Option> = None; if let Some(traces) = failed_traces { @@ -64,6 +83,8 @@ impl TraceFlusher for ServerlessTraceFlusher { while !trace_builders.is_empty() { let traces: Vec<_> = trace_builders .into_iter() + // Lazily set the API key + .map(|builder| builder.with_api_key(api_key)) .map(SendDataBuilder::build) .collect(); if let Some(failed) = self.send(traces).await { diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 1322fe967..ca65bb405 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -20,7 +20,6 @@ use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection}; use ddcommon::Endpoint; -use dogstatsd::api_key::ApiKeyFactory; use std::str::FromStr; use std::sync::Arc; use tracing::error; @@ -31,7 +30,6 @@ use super::trace_aggregator::SendDataBuilderInfo; #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { pub obfuscation_config: Arc, - pub api_key_factory: Arc, } struct ChunkProcessor { @@ -164,11 +162,11 @@ impl TraceProcessor for ServerlessTraceProcessor { tracer_payload.tags.extend(tags.clone()); } } - let api_key = self.api_key_factory.get_api_key().await.to_string(); let endpoint = Endpoint { url: hyper::Uri::from_str(&config.apm_dd_url) .expect("can't parse trace intake URL, exiting"), - api_key: Some(api_key.into()), + // Will be set at flush time + api_key: None, timeout_ms: config.flush_timeout * 1_000, test_token: None, }; @@ -194,7 +192,6 @@ mod tests { }; use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use dogstatsd::api_key::ApiKeyFactory; use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG}; @@ -299,7 +296,6 @@ mod tests { }; let trace_processor = ServerlessTraceProcessor { - api_key_factory: Arc::new(ApiKeyFactory::new("test-api-key")), obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()), }; let config = create_test_config();