From 377144997199f8a1111b7e0bcb7177e39bd1ddcc Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 29 May 2025 16:54:05 -0400 Subject: [PATCH 1/9] dual shipping metrics --- bottlecap/Cargo.lock | 21 ++----- bottlecap/Cargo.toml | 6 +- bottlecap/src/bin/bottlecap/main.rs | 90 ++++++++++++++++++++++------- bottlecap/src/config/env.rs | 20 +++++++ 4 files changed, 98 insertions(+), 39 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index c330817f9..f366fc23b 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -492,7 +492,7 @@ dependencies = [ "base64 0.22.1", "bytes 1.10.1", "chrono", - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", "datadog-trace-normalization", @@ -723,18 +723,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a#3f3347790d4c7f3393589ef7efb7ccf22405a89a" -dependencies = [ - "reqwest", - "rustls", - "rustls-native-certs", - "tracing", -] - -[[package]] -name = "datadog-fips" -version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910#b1583da00e202985057cf6d48f05e1cac77ab910" +source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" dependencies = [ "reqwest", "rustls", @@ -771,7 +760,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=3f3347790d4c7f3393589ef7efb7ccf22405a89a#3f3347790d4c7f3393589ef7efb7ccf22405a89a" +source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" dependencies = [ "anyhow", "async-trait", @@ -980,9 +969,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910#b1583da00e202985057cf6d48f05e1cac77ab910" +source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b1583da00e202985057cf6d48f05e1cac77ab910)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 58201859f..68f3fab85 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" , features = ["compression"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b1583da00e202985057cf6d48f05e1cac77ab910", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "3f3347790d4c7f3393589ef7efb7ccf22405a89a" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "3f3347790d4c7f3393589ef7efb7ccf22405a89a", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 054cb92fb..9e15619dc 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -103,7 +103,7 @@ impl PendingFlushHandles { &mut self, logs_flusher: &LogsFlusher, trace_flusher: &ServerlessTraceFlusher, - metrics_flusher: &Arc>, + metrics_flusher: &Arc>>, ) -> bool { let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; @@ -164,10 +164,15 @@ impl PendingFlushHandles { let series_clone = series.clone(); let sketches_clone = sketches.clone(); joinset.spawn(async move { - let mut locked_flusher = mf.lock().await; - locked_flusher - .flush_with_retries(Some(series_clone), Some(sketches_clone)) - .await; + let mut locked_flushers = mf.lock().await; + let mut futures = Vec::new(); + for flusher in locked_flushers.iter_mut() { + futures.push(flusher.flush_metrics( + series_clone.clone(), + sketches_clone.clone() + )); + } + futures::future::join_all(futures).await; }); } } @@ -527,6 +532,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; } @@ -540,6 +546,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; let next_response = next_event(client, &r.extension_id).await; @@ -564,17 +571,24 @@ async fn extension_loop_active( .push_back(tokio::spawn(async move { traces_val.flush(None).await.unwrap_or_default() })); - let cloned_metrics_flusher = metrics_flusher.clone(); - pending_flush_handles - .metric_flush_handles - .push_back(tokio::spawn(async move { - cloned_metrics_flusher - .lock() - .await - .flush() - .await - .unwrap_or_default() - })); + let (metrics_flushers, series, sketches) = { + let locked_metrics_flusher = metrics_flusher.lock().await; + let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + let series = aggregator.consume_metrics(); + let sketches = aggregator.consume_distributions(); + (locked_metrics_flusher.clone(), series, sketches) + }; + for mut flusher in metrics_flushers { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + flusher + .flush_metrics(series_clone.clone(), sketches_clone.clone()) + .await + .unwrap_or_default() + }); + pending_flush_handles.metric_flush_handles.push_back(handle); + } race_flush_interval.reset(); } else if current_flush_decision == FlushDecision::Periodic { // TODO(astuyve): still await the shutdown flush handles @@ -585,6 +599,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; last_continuous_flush_error = false; @@ -623,6 +638,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; } @@ -670,6 +686,7 @@ async fn extension_loop_active( &*trace_flusher, &*stats_flusher, &mut race_flush_interval, + &metrics_aggr, ) .await; return Ok(()); @@ -679,14 +696,24 @@ async fn extension_loop_active( async fn blocking_flush_all( logs_flusher: &LogsFlusher, - metrics_flusher: &mut MetricsFlusher, + metrics_flusher: &mut Vec, trace_flusher: &impl TraceFlusher, stats_flusher: &impl StatsFlusher, race_flush_interval: &mut tokio::time::Interval, + metrics_aggr: &Arc>, ) { + let (series, sketches) = { + let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); + (aggregator.consume_metrics(), aggregator.consume_distributions()) + }; + let metrics_futures: Vec<_> = metrics_flusher + .iter_mut() + .map(|f| f.flush_metrics(series.clone(), sketches.clone())) + .collect(); + tokio::join!( logs_flusher.flush(None), - metrics_flusher.flush(), + futures::future::join_all(metrics_futures), trace_flusher.flush(None), stats_flusher.flush() ); @@ -846,7 +873,9 @@ fn start_metrics_flusher( resolved_api_key: String, metrics_aggr: &Arc>, config: &Arc, -) -> MetricsFlusher { +) -> Vec { + let mut flushers = Vec::new(); + let metrics_intake_url = if !config.dd_url.is_empty() { let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL"); @@ -871,7 +900,28 @@ fn start_metrics_flusher( timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), }; - MetricsFlusher::new(flusher_config) + flushers.push(MetricsFlusher::new(flusher_config)); + + for (endpoint_url, api_keys) in &config.additional_endpoints { + let dd_url = DdUrl::new(endpoint_url.clone()).expect("can't parse additional endpoint URL"); + let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None); + let metrics_intake_url = MetricsIntakeUrlPrefix::new(None, prefix_override) + .expect("can't parse additional endpoint URL"); + + // Create a flusher for each endpoint URL and API key pair + for api_key in api_keys { + let additional_flusher_config = MetricsFlusherConfig { + api_key: api_key.clone(), + aggregator: metrics_aggr.clone(), + metrics_intake_url_prefix: metrics_intake_url.clone(), + https_proxy: config.https_proxy.clone(), + timeout: Duration::from_secs(config.flush_timeout), + retry_strategy: DsdRetryStrategy::Immediate(3), + }; + flushers.push(MetricsFlusher::new(additional_flusher_config)); + } + } + flushers } fn start_trace_agent( diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index edd702ed1..6fb48c5b8 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Deserializer}; +use tracing::error; use std::collections::HashMap; use std::vec; @@ -75,6 +76,8 @@ pub struct Config { // Metrics overrides pub dd_url: String, pub url: String, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, // OTLP // // - Traces @@ -172,6 +175,7 @@ impl Default for Config { apm_features: vec![], dd_url: String::default(), url: String::default(), + additional_endpoints: HashMap::new(), // OTLP // // - Receiver @@ -230,3 +234,19 @@ where _ => Err(serde::de::Error::custom("expected a string or an integer")), } } + +fn deserialize_additional_endpoints<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + match serde_json::from_str(&s) { + Ok(map) => Ok(map), + Err(_) => { + error!("Failed to deserialize DD_ADDITIONAL_ENDPOINTS"); + Ok(HashMap::new()) + } + } +} From 83da52c5bb5c238c03c8f95f0219d805cc55f68d Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 30 May 2025 13:18:09 -0400 Subject: [PATCH 2/9] fmt --- bottlecap/Cargo.lock | 6 ++--- bottlecap/Cargo.toml | 6 ++--- bottlecap/src/bin/bottlecap/main.rs | 35 +++++++++++++++++------------ bottlecap/src/config/env.rs | 13 +++++------ 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index f366fc23b..96be058ec 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -723,7 +723,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" +source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" dependencies = [ "reqwest", "rustls", @@ -760,7 +760,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" +source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" dependencies = [ "anyhow", "async-trait", @@ -969,7 +969,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=ba26f297f575163930ef8009864dc15872ac64a6#ba26f297f575163930ef8009864dc15872ac64a6" +source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 68f3fab85..1b5a5ea06 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" , features = ["compression"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "ba26f297f575163930ef8009864dc15872ac64a6", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 9e15619dc..9510047f0 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -167,10 +167,12 @@ impl PendingFlushHandles { let mut locked_flushers = mf.lock().await; let mut futures = Vec::new(); for flusher in locked_flushers.iter_mut() { - futures.push(flusher.flush_metrics( - series_clone.clone(), - sketches_clone.clone() - )); + futures.push( + flusher.flush_metrics( + series_clone.clone(), + sketches_clone.clone(), + ), + ); } futures::future::join_all(futures).await; }); @@ -572,20 +574,22 @@ async fn extension_loop_active( traces_val.flush(None).await.unwrap_or_default() })); let (metrics_flushers, series, sketches) = { - let locked_metrics_flusher = metrics_flusher.lock().await; + let locked_metrics = metrics_flusher.lock().await; let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); - let series = aggregator.consume_metrics(); - let sketches = aggregator.consume_distributions(); - (locked_metrics_flusher.clone(), series, sketches) + ( + locked_metrics.clone(), + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) }; for mut flusher in metrics_flushers { let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { flusher - .flush_metrics(series_clone.clone(), sketches_clone.clone()) - .await - .unwrap_or_default() + .flush_metrics(series_clone.clone(), sketches_clone.clone()) + .await + .unwrap_or_default() }); pending_flush_handles.metric_flush_handles.push_back(handle); } @@ -696,7 +700,7 @@ async fn extension_loop_active( async fn blocking_flush_all( logs_flusher: &LogsFlusher, - metrics_flusher: &mut Vec, + metrics_flusher: &mut [MetricsFlusher], trace_flusher: &impl TraceFlusher, stats_flusher: &impl StatsFlusher, race_flush_interval: &mut tokio::time::Interval, @@ -704,7 +708,10 @@ async fn blocking_flush_all( ) { let (series, sketches) = { let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); - (aggregator.consume_metrics(), aggregator.consume_distributions()) + ( + aggregator.consume_metrics(), + aggregator.consume_distributions(), + ) }; let metrics_futures: Vec<_> = metrics_flusher .iter_mut() @@ -901,7 +908,7 @@ fn start_metrics_flusher( retry_strategy: DsdRetryStrategy::Immediate(3), }; flushers.push(MetricsFlusher::new(flusher_config)); - + for (endpoint_url, api_keys) in &config.additional_endpoints { let dd_url = DdUrl::new(endpoint_url.clone()).expect("can't parse additional endpoint URL"); let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None); diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index 6fb48c5b8..adee3e485 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Deserializer}; -use tracing::error; use std::collections::HashMap; use std::vec; +use tracing::error; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde_aux::field_attributes::deserialize_bool_from_anything; @@ -242,11 +242,10 @@ where D: Deserializer<'de>, { let s: String = Deserialize::deserialize(deserializer)?; - match serde_json::from_str(&s) { - Ok(map) => Ok(map), - Err(_) => { - error!("Failed to deserialize DD_ADDITIONAL_ENDPOINTS"); - Ok(HashMap::new()) - } + if let Ok(map) = serde_json::from_str::>>(&s) { + Ok(map) + } else { + error!("Failed to deserialize DD_ADDITIONAL_ENDPOINTS"); + Ok(HashMap::new()) } } From 0013c1511a4e8f91b6b13893b888b7495d506031 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 4 Jun 2025 13:50:03 -0400 Subject: [PATCH 3/9] support yaml config --- bottlecap/src/config/env.rs | 43 ++++++++++++++++++++++++++++-------- bottlecap/src/config/mod.rs | 7 ++++++ bottlecap/src/config/yaml.rs | 29 ++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index adee3e485..5d0448ffd 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -235,17 +235,42 @@ where } } -fn deserialize_additional_endpoints<'de, D>( - deserializer: D, -) -> Result>, D::Error> +pub (crate) fn deserialize_additional_endpoints<'de, D>(deserializer: D) -> Result>, D::Error> where D: Deserializer<'de>, { - let s: String = Deserialize::deserialize(deserializer)?; - if let Ok(map) = serde_json::from_str::>>(&s) { - Ok(map) - } else { - error!("Failed to deserialize DD_ADDITIONAL_ENDPOINTS"); - Ok(HashMap::new()) + let value = Value::deserialize(deserializer)?; + + match value { + Value::Object(map) => { + // For YAML format (object) in datadog.yaml + let mut result = HashMap::new(); + for (key, value) in map { + match value { + Value::Array(arr) => { + let urls: Vec = arr + .into_iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect(); + result.insert(key, urls); + } + _ => { + error!("Failed to deserialize additional endpoints - Invalid YAML format: expected array for key {}", key); + } + } + } + Ok(result) + } + Value::String(s) if !s.is_empty() => { + // For JSON format (string) in DD_ADDITIONAL_ENDPOINTS + match serde_json::from_str(&s) { + Ok(map) => Ok(map), + _ => { + error!("Failed to deserialize additional endpoints - Invalid JSON format"); + Ok(HashMap::new()) + } + } + } + _ => Ok(HashMap::new()), } } diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 541fa83ab..50fa58898 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -242,6 +242,13 @@ fn merge_config(config: &mut EnvConfig, yaml_config: &YamlConfig) { .otlp_config_traces_span_name_remappings .clone_from(&yaml_otlp_config_traces_span_name_remappings); } + + // Dual Shipping + // + // - Metrics + if config.additional_endpoints.is_empty() { + config.additional_endpoints = yaml_config.additional_endpoints.clone(); + } } #[inline] diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 22e3aa5fe..1d00ddf7a 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use crate::config::{deserialize_apm_replace_rules, deserialize_processing_rules, ProcessingRule}; +use crate::config::env::deserialize_additional_endpoints; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde::Deserialize; use serde_aux::field_attributes::deserialize_bool_from_anything; @@ -17,6 +18,8 @@ pub struct Config { pub apm_config: ApmConfig, pub proxy: ProxyConfig, pub otlp_config: Option, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, } impl Config { @@ -222,6 +225,7 @@ pub struct OtlpTracesConfig { #[cfg(test)] mod tests { use std::path::Path; + use std::collections::HashMap; use crate::config::get_config; @@ -250,4 +254,29 @@ mod tests { Ok(()) }); } + + #[test] + fn test_parse_additional_endpoints_from_yaml() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.create_file( + "datadog.yaml", + r#" +additional_endpoints: + "https://app.datadoghq.com": + - apikey2 + - apikey3 + "https://app.datadoghq.eu": + - apikey4 +"#, + )?; + + let config = get_config(Path::new("")).expect("should parse config"); + let mut expected = HashMap::new(); + expected.insert("https://app.datadoghq.com".to_string(), vec!["apikey2".to_string(), "apikey3".to_string()]); + expected.insert("https://app.datadoghq.eu".to_string(), vec!["apikey4".to_string()]); + assert_eq!(config.additional_endpoints, expected); + Ok(()) + }); + } } From 4dc5d4bbf10daf1404980d3adcaa5d576fa2dad2 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 5 Jun 2025 11:01:20 -0400 Subject: [PATCH 4/9] fmt fixes --- bottlecap/src/config/env.rs | 17 +++++++++-------- bottlecap/src/config/mod.rs | 4 +++- bottlecap/src/config/yaml.rs | 16 +++++++++++----- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index 5d0448ffd..f607bdfd9 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -235,12 +235,14 @@ where } } -pub (crate) fn deserialize_additional_endpoints<'de, D>(deserializer: D) -> Result>, D::Error> +pub(crate) fn deserialize_additional_endpoints<'de, D>( + deserializer: D, +) -> Result>, D::Error> where D: Deserializer<'de>, { let value = Value::deserialize(deserializer)?; - + match value { Value::Object(map) => { // For YAML format (object) in datadog.yaml @@ -263,12 +265,11 @@ where } Value::String(s) if !s.is_empty() => { // For JSON format (string) in DD_ADDITIONAL_ENDPOINTS - match serde_json::from_str(&s) { - Ok(map) => Ok(map), - _ => { - error!("Failed to deserialize additional endpoints - Invalid JSON format"); - Ok(HashMap::new()) - } + if let Ok(map) = serde_json::from_str(&s) { + Ok(map) + } else { + error!("Failed to deserialize additional endpoints - Invalid JSON format"); + Ok(HashMap::new()) } } _ => Ok(HashMap::new()), diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 50fa58898..9f2a19173 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -247,7 +247,9 @@ fn merge_config(config: &mut EnvConfig, yaml_config: &YamlConfig) { // // - Metrics if config.additional_endpoints.is_empty() { - config.additional_endpoints = yaml_config.additional_endpoints.clone(); + config + .additional_endpoints + .clone_from(&yaml_config.additional_endpoints); } } diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 1d00ddf7a..d612ef44b 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; -use crate::config::{deserialize_apm_replace_rules, deserialize_processing_rules, ProcessingRule}; use crate::config::env::deserialize_additional_endpoints; +use crate::config::{deserialize_apm_replace_rules, deserialize_processing_rules, ProcessingRule}; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde::Deserialize; use serde_aux::field_attributes::deserialize_bool_from_anything; @@ -224,8 +224,8 @@ pub struct OtlpTracesConfig { #[cfg(test)] mod tests { - use std::path::Path; use std::collections::HashMap; + use std::path::Path; use crate::config::get_config; @@ -254,7 +254,7 @@ mod tests { Ok(()) }); } - + #[test] fn test_parse_additional_endpoints_from_yaml() { figment::Jail::expect_with(|jail| { @@ -273,8 +273,14 @@ additional_endpoints: let config = get_config(Path::new("")).expect("should parse config"); let mut expected = HashMap::new(); - expected.insert("https://app.datadoghq.com".to_string(), vec!["apikey2".to_string(), "apikey3".to_string()]); - expected.insert("https://app.datadoghq.eu".to_string(), vec!["apikey4".to_string()]); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ); assert_eq!(config.additional_endpoints, expected); Ok(()) }); From 09e0648baa05338213554f027fdcae180fdd7ace Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 9 Jun 2025 08:42:12 -0400 Subject: [PATCH 5/9] deserialize tests --- bottlecap/src/config/env.rs | 72 +++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index f607bdfd9..ae8afbbbf 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -275,3 +275,75 @@ where _ => Ok(HashMap::new()), } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_deserialize_additional_endpoints_yaml() { + // Test YAML format (object) + let input = json!({ + "https://app.datadoghq.com": ["key1", "key2"], + "https://app.datadoghq.eu": ["key3"] + }); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_json() { + // Test JSON string format + let input = json!("{\"https://app.datadoghq.com\":[\"key1\",\"key2\"],\"https://app.datadoghq.eu\":[\"key3\"]}"); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_invalid_or_empty() { + // Test empty YAML + let input = json!({}); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + // Test empty JSON + let input = json!(""); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!({ + "https://app.datadoghq.com": "invalid-yaml" + }); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!("invalid-json"); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + } +} From e09063e3ac292bf6f0fe6c726afa64f988e7e1aa Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 9 Jun 2025 13:04:47 -0400 Subject: [PATCH 6/9] retry metrics for specific endpoint --- bottlecap/src/bin/bottlecap/main.rs | 42 ++++++++++++++++------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 9510047f0..fd34629d6 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -87,7 +87,13 @@ use tracing_subscriber::EnvFilter; struct PendingFlushHandles { trace_flush_handles: FuturesOrdered>>, log_flush_handles: FuturesOrdered>>, - metric_flush_handles: FuturesOrdered, Vec)>>, + metric_flush_handles: FuturesOrdered>, +} + +struct MetricsRetryBatch { + flusher_id: usize, + series: Vec, + sketches: Vec, } impl PendingFlushHandles { @@ -154,27 +160,20 @@ impl PendingFlushHandles { while let Some(retries) = self.metric_flush_handles.next().await { let mf = metrics_flusher.clone(); match retries { - Ok((series, sketches)) => { - if !series.is_empty() || !sketches.is_empty() { + Ok(retry_batch) => { + if !retry_batch.series.is_empty() || !retry_batch.sketches.is_empty() { debug!( "redriving {:?} series and {:?} sketch payloads", - series.len(), - sketches.len() + retry_batch.series.len(), + retry_batch.sketches.len() ); - let series_clone = series.clone(); - let sketches_clone = sketches.clone(); joinset.spawn(async move { let mut locked_flushers = mf.lock().await; - let mut futures = Vec::new(); - for flusher in locked_flushers.iter_mut() { - futures.push( - flusher.flush_metrics( - series_clone.clone(), - sketches_clone.clone(), - ), - ); + if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) { + flusher + .flush_metrics(retry_batch.series, retry_batch.sketches) + .await; } - futures::future::join_all(futures).await; }); } } @@ -582,14 +581,19 @@ async fn extension_loop_active( aggregator.consume_distributions(), ) }; - for mut flusher in metrics_flushers { + for (idx, mut flusher) in metrics_flushers.into_iter().enumerate() { let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { - flusher + let (retry_series, retry_sketches) = flusher .flush_metrics(series_clone.clone(), sketches_clone.clone()) .await - .unwrap_or_default() + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, + } }); pending_flush_handles.metric_flush_handles.push_back(handle); } From c323973207e24668032f7e2938bac936e05c1bac Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 9 Jun 2025 17:13:49 -0400 Subject: [PATCH 7/9] fmt --- bottlecap/src/bin/bottlecap/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index fd34629d6..db679e95d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -595,7 +595,7 @@ async fn extension_loop_active( sketches: retry_sketches, } }); - pending_flush_handles.metric_flush_handles.push_back(handle); + pending_flush_handles.metric_flush_handles.push_back(handle); } race_flush_interval.reset(); } else if current_flush_decision == FlushDecision::Periodic { From e6993533d8488b7bd2709ca96947e57a7bf68d62 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 10 Jun 2025 09:36:14 -0400 Subject: [PATCH 8/9] fix LICENSE-3rdparty.yml --- bottlecap/LICENSE-3rdparty.yml | 210 +-------------------------------- 1 file changed, 1 insertion(+), 209 deletions(-) diff --git a/bottlecap/LICENSE-3rdparty.yml b/bottlecap/LICENSE-3rdparty.yml index b54a2092e..14d2792c5 100644 --- a/bottlecap/LICENSE-3rdparty.yml +++ b/bottlecap/LICENSE-3rdparty.yml @@ -4239,214 +4239,6 @@ third_party_libraries: Copyright 2025 Datadog, Inc. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -- package_name: datadog-fips - package_version: 0.1.0 - repository: https://github.com/DataDog/serverless-components - license: Apache-2.0 - licenses: - - license: Apache-2.0 - text: |2 - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -5071,7 +4863,7 @@ third_party_libraries: same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2025 Datadog, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From fd8634a85be91242868808a9ca1174b747246cab Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 10 Jun 2025 15:56:29 -0400 Subject: [PATCH 9/9] fixes --- bottlecap/Cargo.lock | 6 +- bottlecap/Cargo.toml | 6 +- bottlecap/src/bin/bottlecap/main.rs | 42 ++++--- bottlecap/src/config/additional_endpoints.rs | 118 +++++++++++++++++++ bottlecap/src/config/env.rs | 115 +----------------- bottlecap/src/config/mod.rs | 1 + bottlecap/src/config/yaml.rs | 2 +- 7 files changed, 151 insertions(+), 139 deletions(-) create mode 100644 bottlecap/src/config/additional_endpoints.rs diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 96be058ec..61cbaf178 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -723,7 +723,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ "reqwest", "rustls", @@ -760,7 +760,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ "anyhow", "async-trait", @@ -969,7 +969,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=8613f3df6f584d0f7d95d7f693bce2455693bc3f#8613f3df6f584d0f7d95d7f693bce2455693bc3f" +source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 1b5a5ea06..dc5314059 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" , features = ["compression"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "8613f3df6f584d0f7d95d7f693bce2455693bc3f", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index db679e95d..da8c8c456 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -109,7 +109,7 @@ impl PendingFlushHandles { &mut self, logs_flusher: &LogsFlusher, trace_flusher: &ServerlessTraceFlusher, - metrics_flusher: &Arc>>, + metrics_flushers: &Arc>>, ) -> bool { let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; @@ -158,7 +158,7 @@ impl PendingFlushHandles { } while let Some(retries) = self.metric_flush_handles.next().await { - let mf = metrics_flusher.clone(); + let mf = metrics_flushers.clone(); match retries { Ok(retry_batch) => { if !retry_batch.series.is_empty() || !retry_batch.sketches.is_empty() { @@ -443,7 +443,7 @@ async fn extension_loop_active( .expect("failed to create aggregator"), )); - let metrics_flusher = Arc::new(TokioMutex::new(start_metrics_flusher( + let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers( resolved_api_key.clone(), &metrics_aggr, config, @@ -526,7 +526,7 @@ async fn extension_loop_active( } } _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, @@ -540,7 +540,7 @@ async fn extension_loop_active( } } // flush - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, @@ -559,7 +559,7 @@ async fn extension_loop_active( let tf = trace_flusher.clone(); // Await any previous flush handles. This last_continuous_flush_error = pending_flush_handles - .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flusher) + .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers) .await; let val = logs_flusher.clone(); @@ -572,8 +572,8 @@ async fn extension_loop_active( .push_back(tokio::spawn(async move { traces_val.flush(None).await.unwrap_or_default() })); - let (metrics_flushers, series, sketches) = { - let locked_metrics = metrics_flusher.lock().await; + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = metrics_flushers.lock().await; let mut aggregator = metrics_aggr.lock().expect("lock poisoned"); ( locked_metrics.clone(), @@ -581,7 +581,7 @@ async fn extension_loop_active( aggregator.consume_distributions(), ) }; - for (idx, mut flusher) in metrics_flushers.into_iter().enumerate() { + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { @@ -599,8 +599,7 @@ async fn extension_loop_active( } race_flush_interval.reset(); } else if current_flush_decision == FlushDecision::Periodic { - // TODO(astuyve): still await the shutdown flush handles - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, @@ -639,7 +638,7 @@ async fn extension_loop_active( handle_event_bus_event(event, invocation_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, @@ -661,7 +660,7 @@ async fn extension_loop_active( // Redrive/block on any failed payloads let tf = trace_flusher.clone(); pending_flush_handles - .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flusher) + .await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers) .await; // The Shutdown event we get during a timeout will // never include a report log @@ -687,7 +686,7 @@ async fn extension_loop_active( telemetry_listener_cancel_token.cancel(); // gotta lock here - let mut locked_metrics = metrics_flusher.lock().await; + let mut locked_metrics = metrics_flushers.lock().await; blocking_flush_all( &logs_flusher, &mut locked_metrics, @@ -704,7 +703,7 @@ async fn extension_loop_active( async fn blocking_flush_all( logs_flusher: &LogsFlusher, - metrics_flusher: &mut [MetricsFlusher], + metrics_flushers: &mut [MetricsFlusher], trace_flusher: &impl TraceFlusher, stats_flusher: &impl StatsFlusher, race_flush_interval: &mut tokio::time::Interval, @@ -717,7 +716,7 @@ async fn blocking_flush_all( aggregator.consume_distributions(), ) }; - let metrics_futures: Vec<_> = metrics_flusher + let metrics_futures: Vec<_> = metrics_flushers .iter_mut() .map(|f| f.flush_metrics(series.clone(), sketches.clone())) .collect(); @@ -880,7 +879,7 @@ fn start_logs_agent( (logs_agent_channel, logs_flusher) } -fn start_metrics_flusher( +fn start_metrics_flushers( resolved_api_key: String, metrics_aggr: &Arc>, config: &Arc, @@ -914,7 +913,14 @@ fn start_metrics_flusher( flushers.push(MetricsFlusher::new(flusher_config)); for (endpoint_url, api_keys) in &config.additional_endpoints { - let dd_url = DdUrl::new(endpoint_url.clone()).expect("can't parse additional endpoint URL"); + let dd_url = match DdUrl::new(endpoint_url.clone()) { + Ok(url) => url, + Err(err) => { + error!("Invalid additional endpoint: {err}. Falling back to 'https://app.datadoghq.com'"); + DdUrl::new("https://app.datadoghq.com".to_string()) + .expect("additional endpoint fallback URL is invalid") + } + }; let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None); let metrics_intake_url = MetricsIntakeUrlPrefix::new(None, prefix_override) .expect("can't parse additional endpoint URL"); diff --git a/bottlecap/src/config/additional_endpoints.rs b/bottlecap/src/config/additional_endpoints.rs new file mode 100644 index 000000000..281229d9c --- /dev/null +++ b/bottlecap/src/config/additional_endpoints.rs @@ -0,0 +1,118 @@ +use serde::{Deserialize, Deserializer}; +use serde_json::Value; +use std::collections::HashMap; +use tracing::error; + +#[allow(clippy::module_name_repetitions)] +pub fn deserialize_additional_endpoints<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let value = Value::deserialize(deserializer)?; + + match value { + Value::Object(map) => { + // For YAML format (object) in datadog.yaml + let mut result = HashMap::new(); + for (key, value) in map { + match value { + Value::Array(arr) => { + let urls: Vec = arr + .into_iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect(); + result.insert(key, urls); + } + _ => { + error!("Failed to deserialize additional endpoints - Invalid YAML format: expected array for key {}", key); + } + } + } + Ok(result) + } + Value::String(s) if !s.is_empty() => { + // For JSON format (string) in DD_ADDITIONAL_ENDPOINTS + if let Ok(map) = serde_json::from_str(&s) { + Ok(map) + } else { + error!("Failed to deserialize additional endpoints - Invalid JSON format"); + Ok(HashMap::new()) + } + } + _ => Ok(HashMap::new()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_deserialize_additional_endpoints_yaml() { + // Test YAML format (object) + let input = json!({ + "https://app.datadoghq.com": ["key1", "key2"], + "https://app.datadoghq.eu": ["key3"] + }); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_json() { + // Test JSON string format + let input = json!("{\"https://app.datadoghq.com\":[\"key1\",\"key2\"],\"https://app.datadoghq.eu\":[\"key3\"]}"); + + let result = deserialize_additional_endpoints(input).unwrap(); + + let mut expected = HashMap::new(); + expected.insert( + "https://app.datadoghq.com".to_string(), + vec!["key1".to_string(), "key2".to_string()], + ); + expected.insert( + "https://app.datadoghq.eu".to_string(), + vec!["key3".to_string()], + ); + + assert_eq!(result, expected); + } + + #[test] + fn test_deserialize_additional_endpoints_invalid_or_empty() { + // Test empty YAML + let input = json!({}); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + // Test empty JSON + let input = json!(""); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!({ + "https://app.datadoghq.com": "invalid-yaml" + }); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + + let input = json!("invalid-json"); + let result = deserialize_additional_endpoints(input).unwrap(); + assert!(result.is_empty()); + } +} diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index ae8afbbbf..671cddc90 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -1,7 +1,7 @@ +use crate::config::additional_endpoints::deserialize_additional_endpoints; use serde::{Deserialize, Deserializer}; use std::collections::HashMap; use std::vec; -use tracing::error; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde_aux::field_attributes::deserialize_bool_from_anything; @@ -234,116 +234,3 @@ where _ => Err(serde::de::Error::custom("expected a string or an integer")), } } - -pub(crate) fn deserialize_additional_endpoints<'de, D>( - deserializer: D, -) -> Result>, D::Error> -where - D: Deserializer<'de>, -{ - let value = Value::deserialize(deserializer)?; - - match value { - Value::Object(map) => { - // For YAML format (object) in datadog.yaml - let mut result = HashMap::new(); - for (key, value) in map { - match value { - Value::Array(arr) => { - let urls: Vec = arr - .into_iter() - .filter_map(|v| v.as_str().map(String::from)) - .collect(); - result.insert(key, urls); - } - _ => { - error!("Failed to deserialize additional endpoints - Invalid YAML format: expected array for key {}", key); - } - } - } - Ok(result) - } - Value::String(s) if !s.is_empty() => { - // For JSON format (string) in DD_ADDITIONAL_ENDPOINTS - if let Ok(map) = serde_json::from_str(&s) { - Ok(map) - } else { - error!("Failed to deserialize additional endpoints - Invalid JSON format"); - Ok(HashMap::new()) - } - } - _ => Ok(HashMap::new()), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_json::json; - - #[test] - fn test_deserialize_additional_endpoints_yaml() { - // Test YAML format (object) - let input = json!({ - "https://app.datadoghq.com": ["key1", "key2"], - "https://app.datadoghq.eu": ["key3"] - }); - - let result = deserialize_additional_endpoints(input).unwrap(); - - let mut expected = HashMap::new(); - expected.insert( - "https://app.datadoghq.com".to_string(), - vec!["key1".to_string(), "key2".to_string()], - ); - expected.insert( - "https://app.datadoghq.eu".to_string(), - vec!["key3".to_string()], - ); - - assert_eq!(result, expected); - } - - #[test] - fn test_deserialize_additional_endpoints_json() { - // Test JSON string format - let input = json!("{\"https://app.datadoghq.com\":[\"key1\",\"key2\"],\"https://app.datadoghq.eu\":[\"key3\"]}"); - - let result = deserialize_additional_endpoints(input).unwrap(); - - let mut expected = HashMap::new(); - expected.insert( - "https://app.datadoghq.com".to_string(), - vec!["key1".to_string(), "key2".to_string()], - ); - expected.insert( - "https://app.datadoghq.eu".to_string(), - vec!["key3".to_string()], - ); - - assert_eq!(result, expected); - } - - #[test] - fn test_deserialize_additional_endpoints_invalid_or_empty() { - // Test empty YAML - let input = json!({}); - let result = deserialize_additional_endpoints(input).unwrap(); - assert!(result.is_empty()); - - // Test empty JSON - let input = json!(""); - let result = deserialize_additional_endpoints(input).unwrap(); - assert!(result.is_empty()); - - let input = json!({ - "https://app.datadoghq.com": "invalid-yaml" - }); - let result = deserialize_additional_endpoints(input).unwrap(); - assert!(result.is_empty()); - - let input = json!("invalid-json"); - let result = deserialize_additional_endpoints(input).unwrap(); - assert!(result.is_empty()); - } -} diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 9f2a19173..6e784ea20 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -1,3 +1,4 @@ +pub mod additional_endpoints; pub mod apm_replace_rule; pub mod aws; pub mod env; diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index d612ef44b..e82b322cb 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::config::env::deserialize_additional_endpoints; +use crate::config::additional_endpoints::deserialize_additional_endpoints; use crate::config::{deserialize_apm_replace_rules, deserialize_processing_rules, ProcessingRule}; use datadog_trace_obfuscation::replacer::ReplaceRule; use serde::Deserialize;