diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 004bdbaf3..5f975dec6 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -492,7 +492,7 @@ dependencies = [ "base64 0.22.1", "bytes 1.10.1", "chrono", - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", "datadog-trace-normalization 19.1.0", @@ -721,6 +721,17 @@ dependencies = [ "typenum", ] +[[package]] +name = "datadog-fips" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" +dependencies = [ + "reqwest", + "rustls", + "rustls-native-certs", + "tracing", +] + [[package]] name = "datadog-fips" version = "0.1.0" @@ -1067,9 +1078,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" +source = "git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b#b29bba8b4178fc2089943fe28e853d529826888b" dependencies = [ - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b29bba8b4178fc2089943fe28e853d529826888b)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 56ed5c85f..2cb2976ad 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false } datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 11609f423..126932521 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,6 +66,7 @@ use datadog_trace_utils::send_data::SendData; use decrypt::resolve_secrets; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, + api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{ DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, @@ -88,7 +89,7 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, Instant}, }; -use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex, task::JoinHandle}; +use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex, sync::RwLock, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use tracing_subscriber::EnvFilter; @@ -333,7 +334,7 @@ async fn register(client: &Client) -> Result { #[tokio::main] async fn main() -> Result<()> { let start_time = Instant::now(); - let (aws_config, mut aws_credentials, config) = load_configs(start_time); + let (aws_config, aws_credentials, config) = load_configs(start_time); enable_logging_subsystem(&config); log_fips_status(&aws_config.region); @@ -360,33 +361,27 @@ async fn main() -> Result<()> { .await .map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - if let Some(resolved_api_key) = - resolve_secrets(Arc::clone(&config), &aws_config, &mut aws_credentials).await + let aws_config = Arc::new(aws_config); + let api_key_factory = create_api_key_factory(&config, &aws_config, aws_credentials); + + match extension_loop_active( + Arc::clone(&aws_config), + &config, + &client, + &r, + Arc::clone(&api_key_factory), + start_time, + ) + .await { - match extension_loop_active( - &aws_config, - &config, - &client, - &r, - resolved_api_key, - start_time, - ) - .await - { - Ok(()) => { - debug!("Extension loop completed successfully"); - Ok(()) - } - Err(e) => { - error!( - "Extension loop failed: {e:?}, Calling /next without Datadog instrumentation" - ); - extension_loop_idle(&client, &r).await - } + Ok(()) => { + debug!("Extension loop completed successfully"); + Ok(()) + } + Err(e) => { + error!("Extension loop failed: {e:?}, Calling /next without Datadog instrumentation"); + extension_loop_idle(&client, &r).await } - } else { - error!("Failed to resolve secrets, Datadog extension will be idle"); - extension_loop_idle(&client, &r).await } } @@ -430,6 +425,28 @@ fn enable_logging_subsystem(config: &Arc) { debug!("Logging subsystem enabled"); } +fn create_api_key_factory( + config: &Arc, + aws_config: &Arc, + aws_credentials: AwsCredentials, +) -> Arc { + let config = Arc::clone(config); + let aws_config = Arc::clone(aws_config); + let aws_credentials = Arc::new(RwLock::new(aws_credentials)); + + Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || { + let config = Arc::clone(&config); + let aws_config = Arc::clone(&aws_config); + let aws_credentials = Arc::clone(&aws_credentials); + + Box::pin(async move { + resolve_secrets(config, aws_config, aws_credentials) + .await + .expect("Failed to resolve API key") + }) + }))) +} + async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()> { loop { match next_event(client, &r.extension_id).await { @@ -446,11 +463,11 @@ async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<() #[allow(clippy::too_many_lines)] async fn extension_loop_active( - aws_config: &AwsConfig, + aws_config: Arc, config: &Arc, client: &Client, r: &RegisterResponse, - resolved_api_key: String, + api_key_factory: Arc, start_time: Instant, ) -> Result<()> { let mut event_bus = EventBus::run(); @@ -460,11 +477,11 @@ async fn extension_loop_active( .as_ref() .unwrap_or(&"none".to_string()) .to_string(); - let tags_provider = setup_tag_provider(aws_config, config, &account_id); + let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id); let (logs_agent_channel, logs_flusher) = start_logs_agent( config, - resolved_api_key.clone(), + Arc::clone(&api_key_factory), &tags_provider, event_bus.get_sender_copy(), ); @@ -478,7 +495,7 @@ async fn extension_loop_active( )); let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( - resolved_api_key.clone(), + Arc::clone(&api_key_factory), &metrics_aggr, config, ))); @@ -486,7 +503,7 @@ async fn extension_loop_active( let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), Arc::clone(config), - aws_config, + Arc::clone(&aws_config), Arc::clone(&metrics_aggr), ))); @@ -500,7 +517,7 @@ async fn extension_loop_active( trace_agent_shutdown_token, ) = start_trace_agent( config, - resolved_api_key.clone(), + Arc::clone(&api_key_factory), &tags_provider, Arc::clone(&invocation_processor), Arc::clone(&trace_aggregator), @@ -919,7 +936,7 @@ async fn handle_next_invocation( } fn setup_tag_provider( - aws_config: &AwsConfig, + aws_config: &Arc, config: &Arc, account_id: &str, ) -> Arc { @@ -938,14 +955,14 @@ fn setup_tag_provider( fn start_logs_agent( config: &Arc, - resolved_api_key: String, + api_key_factory: Arc, tags_provider: &Arc, event_bus: Sender, ) -> (Sender, LogsFlusher) { let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus); let logs_agent_channel = logs_agent.get_sender_copy(); let logs_flusher = LogsFlusher::new( - resolved_api_key, + api_key_factory, Arc::clone(&logs_agent.aggregator), config.clone(), ); @@ -956,7 +973,7 @@ fn start_logs_agent( } fn start_metrics_flushers( - resolved_api_key: String, + api_key_factory: Arc, metrics_aggr: &Arc>, config: &Arc, ) -> Vec { @@ -979,7 +996,7 @@ fn start_metrics_flushers( }; let flusher_config = MetricsFlusherConfig { - api_key: resolved_api_key, + api_key_factory, aggregator: Arc::clone(metrics_aggr), metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"), https_proxy: config.proxy_https.clone(), @@ -1003,8 +1020,9 @@ fn start_metrics_flushers( // Create a flusher for each endpoint URL and API key pair for api_key in api_keys { + let additional_api_key_factory = Arc::new(ApiKeyFactory::new(api_key)); let additional_flusher_config = MetricsFlusherConfig { - api_key: api_key.clone(), + api_key_factory: additional_api_key_factory, aggregator: metrics_aggr.clone(), metrics_intake_url_prefix: metrics_intake_url.clone(), https_proxy: config.proxy_https.clone(), @@ -1020,7 +1038,7 @@ fn start_metrics_flushers( #[allow(clippy::type_complexity)] fn start_trace_agent( config: &Arc, - resolved_api_key: String, + api_key_factory: Arc, tags_provider: &Arc, invocation_processor: Arc>, trace_aggregator: Arc>, @@ -1035,7 +1053,7 @@ fn start_trace_agent( // Stats let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default())); let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( - resolved_api_key.clone(), + api_key_factory.clone(), stats_aggregator.clone(), Arc::clone(config), )); @@ -1059,13 +1077,13 @@ fn start_trace_agent( let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new(obfuscation_config), - resolved_api_key: resolved_api_key.clone(), + api_key_factory: api_key_factory.clone(), }); // Proxy let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default())); let proxy_flusher = Arc::new(ProxyFlusher::new( - resolved_api_key, + api_key_factory.clone(), Arc::clone(&proxy_aggregator), Arc::clone(tags_provider), Arc::clone(config), @@ -1080,6 +1098,7 @@ fn start_trace_agent( proxy_aggregator, invocation_processor, Arc::clone(tags_provider), + api_key_factory, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); @@ -1166,15 +1185,14 @@ fn start_otlp_agent( fn start_api_runtime_proxy( config: &Arc, - aws_config: &AwsConfig, + aws_config: Arc, invocation_processor: &Arc>, ) -> Option { - if !should_start_proxy(config, aws_config) { + if !should_start_proxy(config, Arc::clone(&aws_config)) { debug!("Skipping API runtime proxy, no LWA proxy or datadog wrapper found"); return None; } - let aws_config = aws_config.clone(); let invocation_processor = invocation_processor.clone(); interceptor::start(aws_config, invocation_processor).ok() } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 959a74eed..1f8c246b0 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -61,7 +61,7 @@ pub struct Processor { /// Helper class to send enhanced metrics. enhanced_metrics: EnhancedMetrics, /// AWS configuration from the Lambda environment. - aws_config: AwsConfig, + aws_config: Arc, /// Flag to determine if a tracer was detected through /// universal instrumentation. tracer_detected: bool, @@ -86,7 +86,7 @@ impl Processor { pub fn new( tags_provider: Arc, config: Arc, - aws_config: &AwsConfig, + aws_config: Arc, metrics_aggregator: Arc>, ) -> Self { let service = config.service.clone().unwrap_or(String::from("aws.lambda")); @@ -101,7 +101,7 @@ impl Processor { inferrer: SpanInferrer::new(config.service_mapping.clone()), propagator, enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), - aws_config: aws_config.clone(), + aws_config, tracer_detected: false, runtime: None, config: Arc::clone(&config), @@ -517,14 +517,16 @@ impl Processor { dropped_p0_spans: 0, }; - let send_data_builder_info: SendDataBuilderInfo = trace_processor.process_traces( - self.config.clone(), - tags_provider.clone(), - header_tags, - vec![traces], - body_size, - self.inferrer.span_pointers.clone(), - ); + let send_data_builder_info: SendDataBuilderInfo = trace_processor + .process_traces( + self.config.clone(), + tags_provider.clone(), + header_tags, + vec![traces], + body_size, + self.inferrer.span_pointers.clone(), + ) + .await; if let Err(e) = trace_agent_tx.send(send_data_builder_info).await { debug!("Failed to send context spans to agent: {e}"); @@ -965,14 +967,14 @@ mod tests { use dogstatsd::metric::EMPTY_TAGS; fn setup() -> Processor { - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".into(), aws_lwa_proxy_lambda_runtime_api: Some("***".into()), function_name: "test-function".into(), sandbox_init_time: Instant::now(), runtime_api: "***".into(), exec_wrapper: None, - }; + }); let config = Arc::new(config::Config { service: Some("test-service".to_string()), @@ -990,7 +992,7 @@ mod tests { Aggregator::new(EMPTY_TAGS, 1024).expect("failed to create aggregator"), )); - Processor::new(tags_provider, config, &aws_config, metrics_aggregator) + Processor::new(tags_provider, config, aws_config, metrics_aggregator) } #[test] diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 49d0f2f06..32839b8f0 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -358,6 +358,7 @@ mod tests { use crate::traces::propagation::text_map_propagator::DatadogHeaderPropagator; use serde_json::json; use std::collections::HashMap; + use std::sync::Arc; use std::time::Instant; use super::*; @@ -478,14 +479,14 @@ mod tests { }] }); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: Some("".to_string()), runtime_api: "".to_string(), function_name: "".to_string(), sandbox_init_time: Instant::now(), exec_wrapper: None, - }; + }); inferrer.infer_span(&payload, &aws_config); assert!( diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs index 43cc9fabc..a0e597abb 100644 --- a/bottlecap/src/logs/flusher.rs +++ b/bottlecap/src/logs/flusher.rs @@ -2,6 +2,7 @@ use crate::config; use crate::http::get_client; use crate::logs::aggregator::Aggregator; use crate::FLUSH_RETRY_COUNT; +use dogstatsd::api_key::ApiKeyFactory; use futures::future::join_all; use reqwest::header::HeaderMap; use std::error::Error; @@ -11,6 +12,7 @@ use std::{ sync::{Arc, Mutex}, }; use thiserror::Error as ThisError; +use tokio::sync::OnceCell; use tokio::task::JoinSet; use tracing::{debug, error}; use zstd::stream::write::Encoder; @@ -28,48 +30,32 @@ pub struct Flusher { endpoint: String, aggregator: Arc>, config: Arc, - headers: HeaderMap, + api_key_factory: Arc, + headers: OnceCell, } impl Flusher { pub fn new( - api_key: String, + api_key_factory: Arc, endpoint: String, aggregator: Arc>, config: Arc, ) -> Self { let client = get_client(&config); - let mut headers = HeaderMap::new(); - headers.insert( - "DD-API-KEY", - api_key.clone().parse().expect("failed to parse header"), - ); - headers.insert( - "DD-PROTOCOL", - "agent-json".parse().expect("failed to parse header"), - ); - headers.insert( - "Content-Type", - "application/json".parse().expect("failed to parse header"), - ); - - if config.logs_config_use_compression { - headers.insert( - "Content-Encoding", - "zstd".parse().expect("failed to parse header"), - ); - } Flusher { client, endpoint, aggregator, config, - headers, + api_key_factory, + headers: OnceCell::new(), } } pub async fn flush(&self, batches: Option>>>) -> Vec { + let api_key = self.api_key_factory.get_api_key().await; + let mut set = JoinSet::new(); if let Some(logs_batches) = batches { @@ -77,7 +63,7 @@ impl Flusher { if batch.is_empty() { continue; } - let req = self.create_request(batch.clone()); + let req = self.create_request(batch.clone(), api_key).await; set.spawn(async move { Self::send(req).await }); } } @@ -109,12 +95,13 @@ impl Flusher { failed_requests } - fn create_request(&self, data: Vec) -> reqwest::RequestBuilder { + async fn create_request(&self, data: Vec, api_key: &str) -> reqwest::RequestBuilder { let url = format!("{}/api/v2/logs", self.endpoint); + let headers = self.get_headers(api_key).await; self.client .post(&url) .timeout(std::time::Duration::from_secs(self.config.flush_timeout)) - .headers(self.headers.clone()) + .headers(headers.clone()) .body(data) } @@ -160,6 +147,34 @@ impl Flusher { } } } + + async fn get_headers(&self, api_key: &str) -> &HeaderMap { + self.headers + .get_or_init(move || async move { + let mut headers = HeaderMap::new(); + headers.insert( + "DD-API-KEY", + api_key.parse().expect("failed to parse header"), + ); + headers.insert( + "DD-PROTOCOL", + "agent-json".parse().expect("failed to parse header"), + ); + headers.insert( + "Content-Type", + "application/json".parse().expect("failed to parse header"), + ); + + if self.config.logs_config_use_compression { + headers.insert( + "Content-Encoding", + "zstd".parse().expect("failed to parse header"), + ); + } + headers + }) + .await + } } #[allow(clippy::module_name_repetitions)] @@ -171,7 +186,7 @@ pub struct LogsFlusher { impl LogsFlusher { pub fn new( - api_key: String, + api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self { @@ -179,7 +194,7 @@ impl LogsFlusher { // Create primary flusher flushers.push(Flusher::new( - api_key.clone(), + Arc::clone(&api_key_factory), config.logs_config_logs_dd_url.clone(), aggregator.clone(), config.clone(), @@ -189,7 +204,7 @@ impl LogsFlusher { for endpoint in &config.logs_config_additional_endpoints { let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); flushers.push(Flusher::new( - endpoint.api_key.clone(), + Arc::clone(&api_key_factory), endpoint_url, aggregator.clone(), config.clone(), diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 31ddbd169..d86da5600 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -165,14 +165,16 @@ impl Agent { .into_response(); } - let send_data_builder = trace_processor.process_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ); + let send_data_builder = trace_processor + .process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ) + .await; match trace_tx.send(send_data_builder).await { Ok(()) => { diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index cff534e61..6feee0d15 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -27,14 +27,14 @@ use tracing::{debug, error}; const INTERCEPTOR_DEFAULT_PORT: u16 = 9000; type InterceptorState = ( - AwsConfig, + Arc, Arc>, Arc>, Arc>>, ); pub fn start( - aws_config: AwsConfig, + aws_config: Arc, invocation_processor: Arc>, ) -> Result> { let socket = get_proxy_socket_address(&aws_config.aws_lwa_proxy_lambda_runtime_api); @@ -400,18 +400,18 @@ mod tests { MetricsAggregator::new(EMPTY_TAGS, 1024).unwrap(), )); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), function_name: "arn:some-function".to_string(), sandbox_init_time: Instant::now(), runtime_api: aws_lambda_runtime_api.to_string(), aws_lwa_proxy_lambda_runtime_api: Some(aws_lwa_lambda_runtime_api.to_string()), exec_wrapper: None, - }; + }); let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), Arc::clone(&config), - &aws_config, + Arc::clone(&aws_config), metrics_aggregator, ))); diff --git a/bottlecap/src/proxy/mod.rs b/bottlecap/src/proxy/mod.rs index 6668813ab..8213eae70 100644 --- a/bottlecap/src/proxy/mod.rs +++ b/bottlecap/src/proxy/mod.rs @@ -11,7 +11,7 @@ pub mod interceptor; /// - ASM is enabled and the `AWS_LAMBDA_EXEC_WRAPPER` environment variable is set to `/opt/datadog_wrapper` #[must_use] #[allow(clippy::module_name_repetitions)] -pub fn should_start_proxy(config: &Arc, aws_config: &AwsConfig) -> bool { +pub fn should_start_proxy(config: &Arc, aws_config: Arc) -> bool { let lwa_proxy_set = aws_config.aws_lwa_proxy_lambda_runtime_api.is_some(); let datadog_wrapper_set = aws_config .exec_wrapper @@ -34,20 +34,20 @@ mod tests { serverless_appsec_enabled: true, ..Default::default() }); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: Some("127.0.0.1:12345".to_string()), function_name: "".to_string(), runtime_api: "".to_string(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), - }; - assert!(should_start_proxy(&config, &aws_config)); + }); + assert!(should_start_proxy(&config, aws_config)); } #[test] fn test_should_start_proxy_lwa_proxy_set() { let config = Arc::new(Config::default()); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), // LWA proxy is set, so we should start the proxy aws_lwa_proxy_lambda_runtime_api: Some("127.0.0.1:12345".to_string()), @@ -55,8 +55,8 @@ mod tests { runtime_api: "".to_string(), sandbox_init_time: Instant::now(), exec_wrapper: None, - }; - assert!(should_start_proxy(&config, &aws_config)); + }); + assert!(should_start_proxy(&config, aws_config)); } #[test] @@ -66,15 +66,15 @@ mod tests { serverless_appsec_enabled: true, ..Default::default() }); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: None, function_name: "".to_string(), runtime_api: "".to_string(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), - }; - assert!(should_start_proxy(&config, &aws_config)); + }); + assert!(should_start_proxy(&config, aws_config)); } #[test] @@ -84,15 +84,15 @@ mod tests { serverless_appsec_enabled: false, ..Default::default() }); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: None, function_name: "".to_string(), runtime_api: "".to_string(), sandbox_init_time: Instant::now(), exec_wrapper: Some("/opt/datadog_wrapper".to_string()), - }; - assert!(!should_start_proxy(&config, &aws_config)); + }); + assert!(!should_start_proxy(&config, aws_config)); } #[test] @@ -102,7 +102,7 @@ mod tests { serverless_appsec_enabled: true, ..Default::default() }); - let aws_config = AwsConfig { + let aws_config = Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: None, function_name: "".to_string(), @@ -110,7 +110,7 @@ mod tests { sandbox_init_time: Instant::now(), // Datadog wrapper is not set, so we should not start the proxy exec_wrapper: Some("/opt/not_datadog".to_string()), - }; - assert!(!should_start_proxy(&config, &aws_config)); + }); + assert!(!should_start_proxy(&config, aws_config)); } } diff --git a/bottlecap/src/secrets/decrypt.rs b/bottlecap/src/secrets/decrypt.rs index d1d11de2a..1907076fd 100644 --- a/bottlecap/src/secrets/decrypt.rs +++ b/bottlecap/src/secrets/decrypt.rs @@ -14,13 +14,14 @@ use sha2::{Digest, Sha256}; use std::io::Error; use std::sync::Arc; use std::time::Instant; +use tokio::sync::RwLock; use tracing::debug; use tracing::error; pub async fn resolve_secrets( config: Arc, - aws_config: &AwsConfig, - aws_credentials: &mut AwsCredentials, + aws_config: Arc, + aws_credentials: Arc>, ) -> Option { let api_key_candidate = if !config.api_key_secret_arn.is_empty() || !config.kms_api_key.is_empty() { @@ -42,30 +43,36 @@ pub async fn resolve_secrets( } }; - if aws_credentials.aws_secret_access_key.is_empty() - && aws_credentials.aws_access_key_id.is_empty() - && !aws_credentials + let aws_credentials_read = aws_credentials.read().await; + + if aws_credentials_read.aws_secret_access_key.is_empty() + && aws_credentials_read.aws_access_key_id.is_empty() + && !aws_credentials_read .aws_container_credentials_full_uri .is_empty() - && !aws_credentials.aws_container_authorization_token.is_empty() + && !aws_credentials_read + .aws_container_authorization_token + .is_empty() { // We're in Snap Start - let credentials = match get_snapstart_credentials(aws_credentials, &client).await { - Ok(credentials) => credentials, - Err(err) => { - error!("Error getting Snap Start credentials: {}", err); - return None; - } - }; - aws_credentials.aws_access_key_id = credentials["AccessKeyId"] + let credentials = + match get_snapstart_credentials(&aws_credentials_read, &client).await { + Ok(credentials) => credentials, + Err(err) => { + error!("Error getting Snap Start credentials: {}", err); + return None; + } + }; + let mut aws_credentials_write = aws_credentials.write().await; + aws_credentials_write.aws_access_key_id = credentials["AccessKeyId"] .as_str() .unwrap_or_default() .to_string(); - aws_credentials.aws_secret_access_key = credentials["SecretAccessKey"] + aws_credentials_write.aws_secret_access_key = credentials["SecretAccessKey"] .as_str() .unwrap_or_default() .to_string(); - aws_credentials.aws_session_token = credentials["Token"] + aws_credentials_write.aws_session_token = credentials["Token"] .as_str() .unwrap_or_default() .to_string(); @@ -76,7 +83,7 @@ pub async fn resolve_secrets( &client, config.api_key_secret_arn.clone(), aws_config, - aws_credentials, + &aws_credentials_read, ) .await } else { @@ -84,7 +91,7 @@ pub async fn resolve_secrets( &client, config.kms_api_key.clone(), aws_config, - aws_credentials, + &aws_credentials_read, ) .await }; @@ -126,9 +133,9 @@ struct RequestArgs<'a> { async fn decrypt_aws_kms( client: &Client, kms_key: String, - aws_config: &AwsConfig, + aws_config: Arc, aws_credentials: &AwsCredentials, -) -> Result> { +) -> Result> { // When the API key is encrypted using the AWS console, the function name is added as an // encryption context. When the API key is encrypted using the AWS CLI, no encryption context // is added. We need to try decrypting the API key both with and without the encryption context. @@ -138,7 +145,7 @@ async fn decrypt_aws_kms( }); let headers = build_get_secret_signed_headers( - aws_config, + Arc::clone(&aws_config), aws_credentials, aws_config.region.clone(), RequestArgs { @@ -157,11 +164,11 @@ async fn decrypt_aws_kms( } else { let json_body = &serde_json::json!({ "CiphertextBlob": kms_key, - "encryptionContext": { "LambdaFunctionName": aws_config.function_name }} - ); + "encryptionContext": { "LambdaFunctionName": aws_config.function_name } + }); let headers = build_get_secret_signed_headers( - aws_config, + Arc::clone(&aws_config), aws_credentials, aws_config.region.clone(), RequestArgs { @@ -186,9 +193,9 @@ async fn decrypt_aws_kms( async fn decrypt_aws_sm( client: &Client, secret_arn: String, - aws_config: &AwsConfig, + aws_config: Arc, aws_credentials: &AwsCredentials, -) -> Result> { +) -> Result> { let json_body = &serde_json::json!({ "SecretId": secret_arn}); // Supports cross-region secrets let secret_region = secret_arn @@ -220,7 +227,7 @@ async fn decrypt_aws_sm( async fn get_snapstart_credentials( aws_credentials: &AwsCredentials, client: &Client, -) -> Result> { +) -> Result> { let mut headers = HeaderMap::new(); headers.insert( "Authorization", @@ -239,7 +246,7 @@ async fn request( json_body: &Value, headers: HeaderMap, client: &Client, -) -> Result> { +) -> Result> { let host_header = &headers["host"] .to_str() .map_err(|err| Error::new(std::io::ErrorKind::InvalidInput, err.to_string()))?; @@ -254,11 +261,11 @@ async fn request( } fn build_get_secret_signed_headers( - aws_config: &AwsConfig, + aws_config: Arc, aws_credentials: &AwsCredentials, region: String, header_values: RequestArgs, -) -> Result> { +) -> Result> { let amz_date = header_values.time.format("%Y%m%dT%H%M%SZ").to_string(); let date_stamp = header_values.time.format("%Y%m%d").to_string(); @@ -331,7 +338,7 @@ fn build_get_secret_signed_headers( Ok(headers) } -fn sign(key: &[u8], msg: &str) -> Result, Box> { +fn sign(key: &[u8], msg: &str) -> Result, Box> { let mut mac = Hmac::::new_from_slice(key).map_err(|err| { Error::new( std::io::ErrorKind::InvalidInput, @@ -347,7 +354,7 @@ fn get_aws4_signature_key( date_stamp: &str, region_name: &str, service_name: &str, -) -> Result, Box> { +) -> Result, Box> { let k_date = sign(format!("AWS4{key}").as_bytes(), date_stamp)?; let k_region = sign(&k_date, region_name)?; let k_service = sign(&k_region, service_name)?; @@ -374,14 +381,14 @@ mod tests { &NaiveDateTime::parse_from_str("2024-05-30 09:10:11", "%Y-%m-%d %H:%M:%S").unwrap(), ); let headers = build_get_secret_signed_headers( - &AwsConfig { + Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: Some("***".into()), function_name: "arn:some-function".to_string(), sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, - }, + }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY".to_string(), @@ -433,14 +440,14 @@ mod tests { &NaiveDateTime::parse_from_str("2024-05-30 09:10:11", "%Y-%m-%d %H:%M:%S").unwrap(), ); let headers = build_get_secret_signed_headers( - &AwsConfig { + Arc::new(AwsConfig { region: "us-east-1".to_string(), aws_lwa_proxy_lambda_runtime_api: Some("***".into()), function_name: "arn:some-function".to_string(), sandbox_init_time: Instant::now(), runtime_api: String::new(), exec_wrapper: None, - }, + }), &AwsCredentials{ aws_access_key_id: "AKIDEXAMPLE".to_string(), aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY".to_string(), diff --git a/bottlecap/src/traces/proxy_flusher.rs b/bottlecap/src/traces/proxy_flusher.rs index 4cf7912d3..e6ce4be24 100644 --- a/bottlecap/src/traces/proxy_flusher.rs +++ b/bottlecap/src/traces/proxy_flusher.rs @@ -1,8 +1,11 @@ +use dogstatsd::api_key::ApiKeyFactory; +use reqwest::header::HeaderMap; use std::{error::Error, sync::Arc}; use thiserror::Error as ThisError; -use tokio::{sync::Mutex, task::JoinSet}; - -use reqwest::header::HeaderMap; +use tokio::{ + sync::{Mutex, OnceCell}, + task::JoinSet, +}; use tracing::{debug, error}; use crate::{ @@ -27,47 +30,61 @@ pub struct Flusher { client: reqwest::Client, aggregator: Arc>, config: Arc, - headers: HeaderMap, + tags_provider: Arc, + api_key_factory: Arc, + headers: OnceCell, } impl Flusher { pub fn new( - api_key: String, + api_key_factory: Arc, aggregator: Arc>, tags_provider: Arc, config: Arc, ) -> Self { let client = get_client(&config); - let mut headers = HeaderMap::new(); - headers.insert( - "DD-API-KEY", - api_key.parse().expect("Failed to parse API key header"), - ); - let additional_tags = format!( - "_dd.origin:lambda;functionname:{}", - tags_provider - .get_canonical_resource_name() - .unwrap_or_default() - ); - headers.insert( - DD_ADDITIONAL_TAGS_HEADER, - additional_tags - .parse() - .expect("Failed to parse additional tags header"), - ); Flusher { client, aggregator, config, - headers, + tags_provider, + api_key_factory, + headers: OnceCell::new(), } } + async fn get_headers(&self, api_key: &str) -> &HeaderMap { + self.headers + .get_or_init(move || async move { + let mut headers = HeaderMap::new(); + headers.insert( + "DD-API-KEY", + api_key.parse().expect("Failed to parse API key header"), + ); + let additional_tags = format!( + "_dd.origin:lambda;functionname:{}", + self.tags_provider + .get_canonical_resource_name() + .unwrap_or_default() + ); + headers.insert( + DD_ADDITIONAL_TAGS_HEADER, + additional_tags + .parse() + .expect("Failed to parse additional tags header"), + ); + headers + }) + .await + } + pub async fn flush( &self, retry_requests: Option>, ) -> Option> { + let api_key = self.api_key_factory.get_api_key().await; + let mut join_set = JoinSet::new(); let mut requests = Vec::::new(); @@ -79,7 +96,7 @@ impl Flusher { } else { let mut aggregator = self.aggregator.lock().await; for pr in aggregator.get_batch() { - requests.push(self.create_request(pr)); + requests.push(self.create_request(pr, api_key).await); } } @@ -92,14 +109,18 @@ impl Flusher { Self::get_failed_requests(send_results) } - fn create_request(&self, request: ProxyRequest) -> reqwest::RequestBuilder { + async fn create_request( + &self, + request: ProxyRequest, + api_key: &str, + ) -> reqwest::RequestBuilder { let mut headers = request.headers.clone(); // Remove headers that are not needed for the proxy request headers.remove("host"); headers.remove("content-length"); - headers.extend(self.headers.clone()); + headers.extend(self.get_headers(api_key).await.clone()); self.client .post(&request.target_url) diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index ce8d978c5..d75844276 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -5,18 +5,20 @@ use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; +use tokio::sync::OnceCell; use crate::config; use crate::traces::stats_aggregator::StatsAggregator; use datadog_trace_protobuf::pb; use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils}; use ddcommon::Endpoint; +use dogstatsd::api_key::ApiKeyFactory; use tracing::{debug, error}; #[async_trait] pub trait StatsFlusher { fn new( - api_key: String, + api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self @@ -34,29 +36,22 @@ pub struct ServerlessStatsFlusher { // pub buffer: Arc>>, aggregator: Arc>, config: Arc, - endpoint: Endpoint, + api_key_factory: Arc, + endpoint: OnceCell, } #[async_trait] impl StatsFlusher for ServerlessStatsFlusher { fn new( - api_key: String, + api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self { - let stats_url = trace_stats_url(&config.site); - - let endpoint = Endpoint { - url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"), - api_key: Some(api_key.clone().into()), - timeout_ms: config.flush_timeout * 1_000, - test_token: None, - }; - ServerlessStatsFlusher { aggregator, config, - endpoint, + api_key_factory, + endpoint: OnceCell::new(), } } @@ -64,6 +59,24 @@ impl StatsFlusher for ServerlessStatsFlusher { if stats.is_empty() { return; } + + let endpoint = self + .endpoint + .get_or_init({ + move || async move { + let api_key = self.api_key_factory.get_api_key().await.to_string(); + let stats_url = trace_stats_url(&self.config.site); + Endpoint { + url: hyper::Uri::from_str(&stats_url) + .expect("can't make URI from stats url, exiting"), + api_key: Some(api_key.clone().into()), + timeout_ms: self.config.flush_timeout * 1_000, + test_token: None, + } + } + }) + .await; + debug!("Flushing {} stats", stats.len()); let stats_payload = stats_utils::construct_stats_payload(stats); @@ -84,8 +97,8 @@ impl StatsFlusher for ServerlessStatsFlusher { let resp = stats_utils::send_stats_payload( serialized_stats_payload, - &self.endpoint, - &self.config.api_key, + endpoint, + self.api_key_factory.get_api_key().await, ) .await; let elapsed = start.elapsed(); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index c9d5879e7..3b3863911 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -37,6 +37,7 @@ use crate::{ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; +use dogstatsd::api_key::ApiKeyFactory; const TRACE_AGENT_PORT: usize = 8126; @@ -85,6 +86,7 @@ pub struct StatsState { pub struct ProxyState { pub config: Arc, pub proxy_aggregator: Arc>, + pub api_key_factory: Arc, } pub struct TraceAgent { @@ -95,8 +97,9 @@ pub struct TraceAgent { pub proxy_aggregator: Arc>, pub tags_provider: Arc, invocation_processor: Arc>, - tx: Sender, shutdown_token: CancellationToken, + tx: Sender, + api_key_factory: Arc, } #[derive(Clone, Copy)] @@ -117,6 +120,7 @@ impl TraceAgent { proxy_aggregator: Arc>, invocation_processor: Arc>, tags_provider: Arc, + api_key_factory: Arc, ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -143,6 +147,7 @@ impl TraceAgent { invocation_processor, tags_provider, tx: trace_tx, + api_key_factory, shutdown_token: CancellationToken::new(), } } @@ -202,6 +207,7 @@ impl TraceAgent { let proxy_state = ProxyState { config: Arc::clone(&self.config), proxy_aggregator: Arc::clone(&self.proxy_aggregator), + api_key_factory: Arc::clone(&self.api_key_factory), }; let trace_router = Router::new() @@ -486,14 +492,16 @@ impl TraceAgent { } } - let send_data = trace_processor.process_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ); + let send_data = trace_processor + .process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ) + .await; // send trace payload to our trace flusher match trace_tx.send(send_data).await { diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 58b87e7a4..1322fe967 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -9,6 +9,7 @@ use crate::traces::{ DNS_NON_ROUTABLE_ADDRESS_URL_PREFIX, INVOCATION_SPAN_RESOURCE, LAMBDA_EXTENSION_URL_PREFIX, LAMBDA_RUNTIME_URL_PREFIX, LAMBDA_STATSD_URL_PREFIX, }; +use async_trait::async_trait; use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; @@ -19,6 +20,7 @@ use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection}; use ddcommon::Endpoint; +use dogstatsd::api_key::ApiKeyFactory; use std::str::FromStr; use std::sync::Arc; use tracing::error; @@ -29,7 +31,7 @@ use super::trace_aggregator::SendDataBuilderInfo; #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { pub obfuscation_config: Arc, - pub resolved_api_key: String, + pub api_key_factory: Arc, } struct ChunkProcessor { @@ -117,24 +119,26 @@ fn filter_span_from_lambda_library_or_runtime(span: &Span) -> bool { #[allow(clippy::module_name_repetitions)] #[allow(clippy::too_many_arguments)] +#[async_trait] pub trait TraceProcessor { - fn process_traces( + async fn process_traces( &self, config: Arc, tags_provider: Arc, - header_tags: tracer_header_tags::TracerHeaderTags, + header_tags: tracer_header_tags::TracerHeaderTags<'_>, traces: Vec>, body_size: usize, span_pointers: Option>, ) -> SendDataBuilderInfo; } +#[async_trait] impl TraceProcessor for ServerlessTraceProcessor { - fn process_traces( + async fn process_traces( &self, config: Arc, tags_provider: Arc, - header_tags: tracer_header_tags::TracerHeaderTags, + header_tags: tracer_header_tags::TracerHeaderTags<'_>, traces: Vec>, body_size: usize, span_pointers: Option>, @@ -160,10 +164,11 @@ impl TraceProcessor for ServerlessTraceProcessor { tracer_payload.tags.extend(tags.clone()); } } + let api_key = self.api_key_factory.get_api_key().await.to_string(); let endpoint = Endpoint { url: hyper::Uri::from_str(&config.apm_dd_url) .expect("can't parse trace intake URL, exiting"), - api_key: Some(self.resolved_api_key.clone().into()), + api_key: Some(api_key.into()), timeout_ms: config.flush_timeout * 1_000, test_token: None, }; @@ -189,6 +194,7 @@ mod tests { }; use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; + use dogstatsd::api_key::ApiKeyFactory; use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG}; @@ -293,7 +299,7 @@ mod tests { }; let trace_processor = ServerlessTraceProcessor { - resolved_api_key: "foo".to_string(), + api_key_factory: Arc::new(ApiKeyFactory::new("test-api-key")), obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()), }; let config = create_test_config(); @@ -327,7 +333,7 @@ mod tests { }; let received_payload = if let TracerPayloadCollection::V07(payload) = - tracer_payload.builder.build().get_payloads() + tracer_payload.await.builder.build().get_payloads() { Some(payload[0].clone()) } else { diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 48b29b1de..d7c278f80 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -4,6 +4,7 @@ use bottlecap::logs::{agent::LogsAgent, flusher::LogsFlusher}; use bottlecap::tags::provider::Provider; use bottlecap::telemetry::events::TelemetryEvent; use bottlecap::LAMBDA_RUNTIME_SLUG; +use dogstatsd::api_key::ApiKeyFactory; use httpmock::prelude::*; use std::collections::HashMap; use std::sync::Arc; @@ -56,8 +57,9 @@ async fn test_logs() { let bus = EventBus::run(); let mut logs_agent = LogsAgent::new(tags_provider, Arc::clone(&arc_conf), bus.get_sender_copy()); + let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); let logs_flusher = LogsFlusher::new( - dd_api_key.to_string(), + api_key_factory, Arc::clone(&logs_agent.aggregator), arc_conf.clone(), ); diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 19db1982c..83935a877 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -1,6 +1,7 @@ use bottlecap::config::Config; use bottlecap::metrics::enhanced::lambda::Lambda as enhanced_metrics; use dogstatsd::aggregator::Aggregator as MetricsAggregator; +use dogstatsd::api_key::ApiKeyFactory; use dogstatsd::datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}; use dogstatsd::flusher::Flusher as MetricsFlusher; use dogstatsd::flusher::FlusherConfig as MetricsFlusherConfig; @@ -44,7 +45,7 @@ async fn test_enhanced_metrics() { ) .expect("failed to create metrics override"); let flusher_config = MetricsFlusherConfig { - api_key: dd_api_key.to_string(), + api_key_factory: Arc::new(ApiKeyFactory::new(dd_api_key)), aggregator: metrics_aggr.clone(), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(None, Some(metrics_site_override)) .expect("can't parse metrics intake URL from site"),