From 276e120e2dcfbc1394122578aba59c83ba0b28d8 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 26 Jun 2025 13:10:30 -0400 Subject: [PATCH 01/12] read apm additional endpoints config --- bottlecap/src/config/env.rs | 7 +++++++ bottlecap/src/config/mod.rs | 2 ++ bottlecap/src/config/yaml.rs | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index adbb5c745..24e2e24fa 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -166,6 +166,12 @@ pub struct EnvConfig { /// @env `DD_APM_FEATURES` #[serde(deserialize_with = "deserialize_array_from_comma_separated_string")] pub apm_features: Vec, + /// @env `DD_APM_ADDITIONAL_ENDPOINTS` + /// + /// Additional endpoints to send traces to. + /// + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub apm_additional_endpoints: HashMap>, // // Trace Propagation /// @env `DD_TRACE_PROPAGATION_STYLE` @@ -348,6 +354,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { apm_config_obfuscation_http_remove_paths_with_digits ); merge_vec!(config, env_config, apm_features); + merge_hashmap!(config, env_config, apm_additional_endpoints); // Trace Propagation merge_vec!(config, env_config, trace_propagation_style); diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 8b7909030..89cd3c796 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -274,6 +274,7 @@ pub struct Config { pub apm_config_obfuscation_http_remove_query_string: bool, pub apm_config_obfuscation_http_remove_paths_with_digits: bool, pub apm_features: Vec, + pub apm_additional_endpoints: HashMap>, // // Trace Propagation pub trace_propagation_style: Vec, @@ -366,6 +367,7 @@ impl Default for Config { apm_config_obfuscation_http_remove_query_string: false, apm_config_obfuscation_http_remove_paths_with_digits: false, apm_features: vec![], + apm_additional_endpoints: HashMap::new(), trace_propagation_style: vec![ TracePropagationStyle::Datadog, TracePropagationStyle::TraceContext, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index f35a54bfe..485cb58c7 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -133,6 +133,8 @@ pub struct ApmConfig { pub replace_tags: Option>, pub obfuscation: Option, pub features: Vec, + #[serde(deserialize_with = "deserialize_additional_endpoints")] + pub additional_endpoints: HashMap>, } impl ApmConfig { @@ -410,6 +412,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.apm_config, replace_tags ); + merge_hashmap!( + config, + apm_additional_endpoints, + yaml_config.apm_config, + additional_endpoints + ); // Not using the macro here because we need to call a method on the struct if let Some(remove_query_string) = yaml_config From 005a230988fa78da0248ff67141d1074958c6ddc Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 2 Jul 2025 10:11:05 -0400 Subject: [PATCH 02/12] consider additional endpoints in trace flusher --- bottlecap/src/bin/bottlecap/main.rs | 10 ++-- bottlecap/src/traces/trace_flusher.rs | 72 +++++++++++++++++++++------ 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 2aa46a66a..0fdc255d5 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1057,11 +1057,11 @@ fn start_trace_agent( let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); // Traces - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { - aggregator: trace_aggregator.clone(), - config: Arc::clone(config), - api_key_factory: Arc::clone(api_key_factory), - }); + let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + trace_aggregator.clone(), + config.clone(), + api_key_factory.clone(), + )); let obfuscation_config = obfuscation_config::ObfuscationConfig { tag_replace_rules: config.apm_replace_tags.clone(), diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 8fb6a2b47..a4a1bfd0c 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -2,6 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; +use ddcommon::Endpoint; +use futures::future::join_all; +use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{debug, error}; @@ -26,7 +29,11 @@ pub trait TraceFlusher { Self: Sized; /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. /// Returns the traces back if there was an error sending them. - async fn send(&self, traces: Vec) -> Option>; + async fn send( + &self, + traces: Vec, + endpoint: Option, + ) -> Option>; /// Flushes traces by getting every available batch on the aggregator. /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. @@ -40,19 +47,34 @@ pub struct ServerlessTraceFlusher { pub aggregator: Arc>, pub config: Arc, pub api_key_factory: Arc, + pub additional_endpoints: Vec, } #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { - fn new( - aggregator: Arc>, - config: Arc, - api_key_factory: Arc, - ) -> Self { + fn new(aggregator: Arc>, config: Arc, api_key_factory: Arc) -> Self { + let mut additional_endpoints: Vec = Vec::new(); + let config_clone = Arc::clone(&config); + + for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { + for api_key in api_keys { + let endpoint = Endpoint { + url: hyper::Uri::from_str(&endpoint_url) + .expect("can't parse additional trace intake URL, exiting"), + api_key: Some(api_key.clone().into()), + timeout_ms: config.flush_timeout * 1_000, + test_token: None, + }; + + additional_endpoints.push(endpoint); + } + } + ServerlessTraceFlusher { aggregator, - config, + config: config_clone, api_key_factory, + additional_endpoints, } } @@ -62,13 +84,13 @@ impl TraceFlusher for ServerlessTraceFlusher { return None; }; - let mut failed_batch: Option> = None; + let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { // If we have traces from a previous failed attempt, try to send those first if !traces.is_empty() { debug!("Retrying to send {} previously failed traces", traces.len()); - let retry_result = self.send(traces).await; + let retry_result = self.send(traces, None).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -87,9 +109,22 @@ impl TraceFlusher for ServerlessTraceFlusher { .map(|builder| builder.with_api_key(api_key)) .map(SendDataBuilder::build) .collect(); - if let Some(failed) = self.send(traces).await { + if let Some(mut failed) = self.send(traces.clone(), None).await { // Keep track of the failed batch - failed_batch = Some(failed); + failed_batch.append(&mut failed); + } + + let tasks = self.additional_endpoints.iter().map(|endpoint| { + let traces = traces.clone(); + let endpoint = endpoint.clone(); + async move { self.send(traces, Some(endpoint)).await } + }); + + for mut failed in join_all(tasks).await.into_iter().flatten() { + failed_batch.append(&mut failed); + } + + if !failed_batch.is_empty() { // Stop processing more batches if we have a failure break; } @@ -97,10 +132,14 @@ impl TraceFlusher for ServerlessTraceFlusher { trace_builders = guard.get_batch(); } - failed_batch + Some(failed_batch) } - async fn send(&self, traces: Vec) -> Option> { + async fn send( + &self, + traces: Vec, + endpoint: Option, + ) -> Option> { if traces.is_empty() { return None; } @@ -114,9 +153,12 @@ impl TraceFlusher for ServerlessTraceFlusher { let mut tasks = Vec::with_capacity(coalesced_traces.len()); for traces in coalesced_traces { - let proxy_https = self.config.proxy_https.clone(); + // TODO: update the SendData object's endpoint + // + // if there is an endpoint specified, update the SendData object to use that endpoint + let proxy = self.config.proxy_https.clone(); tasks.push(tokio::spawn(async move { - traces.send_proxy(proxy_https.as_deref()).await.last_result + traces.send_proxy(proxy.as_deref()).await.last_result })); } From 5e9afcd21b6af21288398e7dd69fa9c3bc869801 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 8 Jul 2025 13:27:24 -0400 Subject: [PATCH 03/12] set SendData target to additional endpoints --- bottlecap/Cargo.lock | 98 ++++++++++++++++++++------- bottlecap/src/config/env.rs | 11 +++ bottlecap/src/config/yaml.rs | 20 +++++- bottlecap/src/traces/trace_flusher.rs | 11 +-- 4 files changed, 108 insertions(+), 32 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index d2b0c04ba..9ae0d35d4 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -492,7 +492,7 @@ dependencies = [ "base64 0.22.1", "bytes 1.10.1", "chrono", - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", "datadog-trace-normalization 19.1.0", @@ -772,7 +772,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" +source = "git+https://github.com/DataDog/serverless-components?rev=dc14a4cbd8c1b1020afed8cb02eb2dd698884af4#dc14a4cbd8c1b1020afed8cb02eb2dd698884af4" dependencies = [ "anyhow", "async-trait", @@ -793,8 +793,8 @@ dependencies = [ [[package]] name = "datadog-trace-normalization" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" +version = "19.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -811,8 +811,8 @@ dependencies = [ [[package]] name = "datadog-trace-obfuscation" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" +version = "19.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -865,8 +865,8 @@ dependencies = [ [[package]] name = "datadog-trace-utils" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" +version = "19.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" dependencies = [ "anyhow", "bytes 1.10.1", @@ -878,7 +878,6 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-http-proxy", - "log", "prost", "rand 0.8.5", "rmp", @@ -888,6 +887,35 @@ dependencies = [ "serde_json", "tinybytes 17.0.0", "tokio", + "tracing", + "zstd", +] + +[[package]] +name = "datadog-trace-utils" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" +dependencies = [ + "anyhow", + "bytes 1.10.1", + "datadog-trace-normalization 19.1.0", + "datadog-trace-protobuf 19.1.0", + "ddcommon 19.1.0", + "flate2", + "futures 0.3.31", + "http-body-util", + "hyper 1.6.0", + "hyper-http-proxy", + "prost", + "rand 0.8.5", + "rmp", + "rmp-serde", + "rmpv", + "serde", + "serde_json", + "tinybytes 19.1.0", + "tokio", + "tracing", "zstd", ] @@ -921,8 +949,41 @@ dependencies = [ [[package]] name = "ddcommon" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" +version = "19.0.1" +source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" +dependencies = [ + "anyhow", + "cc", + "const_format", + "futures 0.3.31", + "futures-core", + "futures-util", + "hex", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-rustls", + "hyper-util", + "libc", + "nix 0.29.0", + "pin-project", + "regex", + "rustls", + "rustls-native-certs", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "tokio-rustls", + "tower-service", + "windows-sys 0.52.0", +] + +[[package]] +name = "ddcommon" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", "cc", @@ -938,18 +999,14 @@ dependencies = [ "hyper-rustls", "hyper-util", "libc", - "log", - "memfd", "nix 0.29.0", "pin-project", - "rand 0.8.5", "regex", - "rmp", - "rmp-serde", "rustls", "rustls-native-certs", "serde", "static_assertions", + "thiserror 1.0.69", "tokio", "tokio-rustls", "tower-service", @@ -2089,15 +2146,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "memfd" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" -dependencies = [ - "rustix 0.38.44", -] - [[package]] name = "mime" version = "0.3.17" diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index 24e2e24fa..22b9b8fb4 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -544,6 +544,7 @@ mod tests { "DD_APM_FEATURES", "enable_otlp_compute_top_level_by_span_kind,enable_stats_by_span_kind", ); + jail.set_env("DD_APM_ADDITIONAL_ENDPOINTS", "{\"https://trace.agent.datadoghq.com\": [\"apikey2\", \"apikey3\"], \"https://trace.agent.datadoghq.eu\": [\"apikey4\"]}"); // Trace Propagation jail.set_env("DD_TRACE_PROPAGATION_STYLE", "datadog"); @@ -684,6 +685,16 @@ mod tests { "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), ], + apm_additional_endpoints: HashMap::from([ + ( + "https://trace.agent.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ), + ( + "https://trace.agent.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ), + ]), trace_propagation_style: vec![TracePropagationStyle::Datadog], trace_propagation_style_extract: vec![TracePropagationStyle::B3], trace_propagation_extract_first: true, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 485cb58c7..aa84037b1 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -649,8 +649,8 @@ http_protocol: "http1" # Endpoints additional_endpoints: "https://app.datadoghq.com": - - "apikey2" - - "apikey3" + - apikey2 + - apikey3 "https://app.datadoghq.eu": - apikey4 @@ -688,6 +688,12 @@ apm_config: features: - "enable_otlp_compute_top_level_by_span_kind" - "enable_stats_by_span_kind" + additional_endpoints: + "https://trace.agent.datadoghq.com": + - apikey2 + - apikey3 + "https://trace.agent.datadoghq.eu": + - apikey4 service_mapping: old-service:new-service @@ -809,6 +815,16 @@ extension_version: "compatibility" "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), ], + apm_additional_endpoints: HashMap::from([ + ( + "https://trace.agent.datadoghq.com".to_string(), + vec!["apikey2".to_string(), "apikey3".to_string()], + ), + ( + "https://trace.agent.datadoghq.eu".to_string(), + vec!["apikey4".to_string()], + ), + ]), trace_propagation_style: vec![TracePropagationStyle::Datadog], trace_propagation_style_extract: vec![TracePropagationStyle::B3], trace_propagation_extract_first: true, diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index a4a1bfd0c..f05432085 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -58,8 +58,9 @@ impl TraceFlusher for ServerlessTraceFlusher { for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { + let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); let endpoint = Endpoint { - url: hyper::Uri::from_str(&endpoint_url) + url: hyper::Uri::from_str(&trace_intake_url) .expect("can't parse additional trace intake URL, exiting"), api_key: Some(api_key.clone().into()), timeout_ms: config.flush_timeout * 1_000, @@ -152,10 +153,10 @@ impl TraceFlusher for ServerlessTraceFlusher { let coalesced_traces = trace_utils::coalesce_send_data(traces); let mut tasks = Vec::with_capacity(coalesced_traces.len()); - for traces in coalesced_traces { - // TODO: update the SendData object's endpoint - // - // if there is an endpoint specified, update the SendData object to use that endpoint + for mut traces in coalesced_traces { + if let Some(additional_endpoint) = endpoint.clone() { + traces.set_target(additional_endpoint); + } let proxy = self.config.proxy_https.clone(); tasks.push(tokio::spawn(async move { traces.send_proxy(proxy.as_deref()).await.last_result From 147eb1fbb5ffbb9553938543ed3fbbc130a51919 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 8 Jul 2025 14:36:44 -0400 Subject: [PATCH 04/12] clippy --- bottlecap/src/otlp/agent.rs | 15 +++++++++------ bottlecap/src/proxy/interceptor.rs | 3 +-- bottlecap/src/traces/stats_aggregator.rs | 6 +++--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index d86da5600..dcc0eb0ab 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -49,7 +49,7 @@ impl Agent { trace_tx: Sender, ) -> Self { let port = Self::parse_port( - &config.otlp_config_receiver_protocols_http_endpoint, + config.otlp_config_receiver_protocols_http_endpoint.as_ref(), OTLP_AGENT_HTTP_PORT, ); let cancel_token = CancellationToken::new(); @@ -204,7 +204,10 @@ mod tests { fn test_parse_port_with_valid_endpoint() { // Test with a valid endpoint containing a port let endpoint = Some("localhost:8080".to_string()); - assert_eq!(Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), 8080); + assert_eq!( + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), + 8080 + ); } #[test] @@ -212,7 +215,7 @@ mod tests { // Test with an endpoint containing an invalid port format let endpoint = Some("localhost:invalid".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -222,7 +225,7 @@ mod tests { // Test with an endpoint missing a port let endpoint = Some("localhost".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -232,7 +235,7 @@ mod tests { // Test with None endpoint let endpoint: Option = None; assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } @@ -242,7 +245,7 @@ mod tests { // Test with an empty endpoint let endpoint = Some("".to_string()); assert_eq!( - Agent::parse_port(&endpoint, OTLP_AGENT_HTTP_PORT), + Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), OTLP_AGENT_HTTP_PORT ); } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index d7efeadf3..6e2a24596 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -37,7 +37,7 @@ pub fn start( aws_config: Arc, invocation_processor: Arc>, ) -> Result> { - let socket = get_proxy_socket_address(&aws_config.aws_lwa_proxy_lambda_runtime_api); + let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref()); let shutdown_token = CancellationToken::new(); let mut connector = HttpConnector::new(); @@ -114,7 +114,6 @@ async fn graceful_shutdown(tasks: Arc>>, shutdown_token: Cance #[allow(clippy::ref_option)] fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: &Option) -> SocketAddr { if let Some(socket_addr) = aws_lwa_proxy_lambda_runtime_api - .as_ref() .and_then(|uri_str| lwa::get_lwa_proxy_socket_address(uri_str).ok()) { debug!("PROXY | get_proxy_socket_address | LWA proxy detected"); diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 9693f722d..3ae98ff23 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -7,9 +7,9 @@ use std::collections::VecDeque; /// // const MAX_BATCH_ENTRIES_SIZE: usize = 4000; -/// Aproximate size an entry in a stat payload occupies -/// -/// +// Aproximate size an entry in a stat payload occupies +// +// // const MAX_ENTRY_SIZE_BYTES: usize = 375; /// Maximum content size per payload in compressed bytes, From d51b6bbda69c7935e37de5299b770377b8ddc2c7 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 8 Jul 2025 15:15:29 -0400 Subject: [PATCH 05/12] add trace compression level config --- bottlecap/src/config/env.rs | 9 +++++++++ bottlecap/src/config/mod.rs | 2 ++ bottlecap/src/config/yaml.rs | 9 +++++++++ bottlecap/src/traces/trace_flusher.rs | 4 ++-- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index 22b9b8fb4..7509a1da8 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -163,6 +163,12 @@ pub struct EnvConfig { /// @env `DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS` #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub apm_config_obfuscation_http_remove_paths_with_digits: Option, + /// @env `DD_APM_CONFIG_COMPRESSION_LEVEL` + /// + /// The Agent compresses traces before sending them. The `compression_level` parameter + /// accepts values from 0 (no compression) to 9 (maximum compression but + /// higher resource usage). + pub apm_config_compression_level: Option, /// @env `DD_APM_FEATURES` #[serde(deserialize_with = "deserialize_array_from_comma_separated_string")] pub apm_features: Vec, @@ -353,6 +359,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { env_config, apm_config_obfuscation_http_remove_paths_with_digits ); + merge_option_to_value!(config, env_config, apm_config_compression_level); merge_vec!(config, env_config, apm_features); merge_hashmap!(config, env_config, apm_additional_endpoints); @@ -540,6 +547,7 @@ mod tests { "DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS", "true", ); + jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "3"); jail.set_env( "DD_APM_FEATURES", "enable_otlp_compute_top_level_by_span_kind,enable_stats_by_span_kind", @@ -681,6 +689,7 @@ mod tests { ), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, + apm_config_compression_level: 3, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 89cd3c796..e1e2f7f1e 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -273,6 +273,7 @@ pub struct Config { pub apm_replace_tags: Option>, pub apm_config_obfuscation_http_remove_query_string: bool, pub apm_config_obfuscation_http_remove_paths_with_digits: bool, + pub apm_config_compression_level: i32, pub apm_features: Vec, pub apm_additional_endpoints: HashMap>, // @@ -366,6 +367,7 @@ impl Default for Config { apm_replace_tags: None, apm_config_obfuscation_http_remove_query_string: false, apm_config_obfuscation_http_remove_paths_with_digits: false, + apm_config_compression_level: 6, apm_features: vec![], apm_additional_endpoints: HashMap::new(), trace_propagation_style: vec![ diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index aa84037b1..7b944ea42 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -132,6 +132,7 @@ pub struct ApmConfig { #[serde(deserialize_with = "deserialize_apm_replace_rules")] pub replace_tags: Option>, pub obfuscation: Option, + pub compression_level: Option, pub features: Vec, #[serde(deserialize_with = "deserialize_additional_endpoints")] pub additional_endpoints: HashMap>, @@ -412,6 +413,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.apm_config, replace_tags ); + merge_option_to_value!( + config, + apm_config_compression_level, + yaml_config.apm_config, + compression_level + ); merge_hashmap!( config, apm_additional_endpoints, @@ -685,6 +692,7 @@ apm_config: http: remove_query_string: true remove_paths_with_digits: true + compression_level: 3 features: - "enable_otlp_compute_top_level_by_span_kind" - "enable_stats_by_span_kind" @@ -811,6 +819,7 @@ extension_version: "compatibility" apm_replace_tags: Some(vec![]), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, + apm_config_compression_level: 3, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index f05432085..638dfba59 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -115,18 +115,18 @@ impl TraceFlusher for ServerlessTraceFlusher { failed_batch.append(&mut failed); } + // Send to additional endpoints let tasks = self.additional_endpoints.iter().map(|endpoint| { let traces = traces.clone(); let endpoint = endpoint.clone(); async move { self.send(traces, Some(endpoint)).await } }); - for mut failed in join_all(tasks).await.into_iter().flatten() { failed_batch.append(&mut failed); } + // Stop processing more batches if we have a failure if !failed_batch.is_empty() { - // Stop processing more batches if we have a failure break; } From 451476f324ea50c30543575ce22387c210afa45d Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 8 Jul 2025 15:48:19 -0400 Subject: [PATCH 06/12] fix flush return --- bottlecap/src/traces/trace_flusher.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 638dfba59..38f18704f 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -133,7 +133,11 @@ impl TraceFlusher for ServerlessTraceFlusher { trace_builders = guard.get_batch(); } - Some(failed_batch) + if failed_batch.is_empty() { + None + } else { + Some(failed_batch) + } } async fn send( From 3c092092c5e645ee00aa8cc6913097e060ec1887 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 9 Jul 2025 16:30:29 -0400 Subject: [PATCH 07/12] update dependencies --- bottlecap/Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 9ae0d35d4..3efeb568a 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -772,7 +772,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=dc14a4cbd8c1b1020afed8cb02eb2dd698884af4#dc14a4cbd8c1b1020afed8cb02eb2dd698884af4" +source = "git+https://github.com/DataDog/serverless-components?rev=2643867ebc240bd356a5f974c3aab9e973fba721#2643867ebc240bd356a5f974c3aab9e973fba721" dependencies = [ "anyhow", "async-trait", @@ -794,7 +794,7 @@ dependencies = [ [[package]] name = "datadog-trace-normalization" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" +source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "datadog-trace-obfuscation" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" +source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -866,7 +866,7 @@ dependencies = [ [[package]] name = "datadog-trace-utils" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" +source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" dependencies = [ "anyhow", "bytes 1.10.1", From e958d0fe9b0baca7b82b3ad7fc689f39c60cbf4d Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 14 Jul 2025 12:47:09 -0400 Subject: [PATCH 08/12] use SendData with_endpoint --- bottlecap/Cargo.lock | 8 +++---- bottlecap/src/traces/trace_flusher.rs | 30 ++++++++++++--------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 3efeb568a..c2b9aca5e 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -772,7 +772,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=2643867ebc240bd356a5f974c3aab9e973fba721#2643867ebc240bd356a5f974c3aab9e973fba721" +source = "git+https://github.com/DataDog/serverless-components?rev=8078e5f6b016a8ff2ca952165f982e68fad97a62#8078e5f6b016a8ff2ca952165f982e68fad97a62" dependencies = [ "anyhow", "async-trait", @@ -794,7 +794,7 @@ dependencies = [ [[package]] name = "datadog-trace-normalization" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" +source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "datadog-trace-obfuscation" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" +source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" dependencies = [ "anyhow", "datadog-trace-protobuf 17.0.0", @@ -866,7 +866,7 @@ dependencies = [ [[package]] name = "datadog-trace-utils" version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=b708db75c1f21addcd43cf607d0ebf2ff4cfb17f#b708db75c1f21addcd43cf607d0ebf2ff4cfb17f" +source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" dependencies = [ "anyhow", "bytes 1.10.1", diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 38f18704f..6bfc60e43 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -32,7 +32,7 @@ pub trait TraceFlusher { async fn send( &self, traces: Vec, - endpoint: Option, + endpoint: Option<&Endpoint>, ) -> Option>; /// Flushes traces by getting every available batch on the aggregator. @@ -111,15 +111,13 @@ impl TraceFlusher for ServerlessTraceFlusher { .map(SendDataBuilder::build) .collect(); if let Some(mut failed) = self.send(traces.clone(), None).await { - // Keep track of the failed batch failed_batch.append(&mut failed); } // Send to additional endpoints let tasks = self.additional_endpoints.iter().map(|endpoint| { - let traces = traces.clone(); - let endpoint = endpoint.clone(); - async move { self.send(traces, Some(endpoint)).await } + let traces_clone = traces.clone(); + async move { self.send(traces_clone, Some(endpoint)).await } }); for mut failed in join_all(tasks).await.into_iter().flatten() { failed_batch.append(&mut failed); @@ -143,27 +141,25 @@ impl TraceFlusher for ServerlessTraceFlusher { async fn send( &self, traces: Vec, - endpoint: Option, + endpoint: Option<&Endpoint>, ) -> Option> { if traces.is_empty() { return None; } let start = std::time::Instant::now(); - debug!("Flushing {} traces", traces.len()); - - // Since we return the original traces on error, we need to clone them before coalescing - let traces_clone = traces.clone(); let coalesced_traces = trace_utils::coalesce_send_data(traces); let mut tasks = Vec::with_capacity(coalesced_traces.len()); + debug!("Flushing {} traces", coalesced_traces.len()); - for mut traces in coalesced_traces { - if let Some(additional_endpoint) = endpoint.clone() { - traces.set_target(additional_endpoint); - } + for trace in coalesced_traces.iter() { + let trace_with_endpoint = match endpoint.clone() { + Some(additional_endpoint) => trace.with_endpoint(additional_endpoint), + None => trace.clone(), + }; let proxy = self.config.proxy_https.clone(); tasks.push(tokio::spawn(async move { - traces.send_proxy(proxy.as_deref()).await.last_result + trace_with_endpoint.send_proxy(proxy.as_deref()).await.last_result })); } @@ -173,13 +169,13 @@ impl TraceFlusher for ServerlessTraceFlusher { if let Err(e) = result { error!("Error sending trace: {e:?}"); // Return the original traces for retry - return Some(traces_clone); + return Some(coalesced_traces.clone()); } } Err(e) => { error!("Task join error: {e:?}"); // Return the original traces for retry if a task panics - return Some(traces_clone); + return Some(coalesced_traces.clone()); } } } From 365dd918581a30fe5788af3bbe4797238bfe9fc8 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 18 Jul 2025 13:21:58 -0400 Subject: [PATCH 09/12] rebase fixes --- bottlecap/Cargo.lock | 221 +++----------------------- bottlecap/Cargo.toml | 6 +- bottlecap/src/otlp/agent.rs | 4 +- bottlecap/src/proxy/interceptor.rs | 4 +- bottlecap/src/traces/trace_flusher.rs | 22 ++- 5 files changed, 42 insertions(+), 215 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index c2b9aca5e..a703562df 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -495,11 +495,11 @@ dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", - "datadog-trace-normalization 19.1.0", - "datadog-trace-obfuscation 19.1.0", - "datadog-trace-protobuf 19.1.0", - "datadog-trace-utils 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-normalization", + "datadog-trace-obfuscation", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/)", "dogstatsd", "figment", @@ -724,18 +724,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" -dependencies = [ - "reqwest", - "rustls", - "rustls-native-certs", - "tracing", -] - -[[package]] -name = "datadog-fips" -version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=c3d8ed4f90591c6958921145d485463860307f78#c3d8ed4f90591c6958921145d485463860307f78" +source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" dependencies = [ "reqwest", "rustls", @@ -772,15 +761,15 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=8078e5f6b016a8ff2ca952165f982e68fad97a62#8078e5f6b016a8ff2ca952165f982e68fad97a62" +source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" dependencies = [ "anyhow", "async-trait", - "datadog-trace-normalization 17.0.0", - "datadog-trace-obfuscation 17.0.0", - "datadog-trace-protobuf 17.0.0", - "datadog-trace-utils 17.0.0", - "ddcommon 17.0.0", + "datadog-trace-normalization", + "datadog-trace-obfuscation", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "http-body-util", "hyper 1.6.0", "hyper-util", @@ -791,39 +780,13 @@ dependencies = [ "tracing", ] -[[package]] -name = "datadog-trace-normalization" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" -dependencies = [ - "anyhow", - "datadog-trace-protobuf 17.0.0", -] - [[package]] name = "datadog-trace-normalization" version = "19.1.0" source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.1.0", -] - -[[package]] -name = "datadog-trace-obfuscation" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" -dependencies = [ - "anyhow", - "datadog-trace-protobuf 17.0.0", - "datadog-trace-utils 17.0.0", - "ddcommon 17.0.0", - "log", - "percent-encoding", - "regex", - "serde", - "serde_json", - "url", + "datadog-trace-protobuf", ] [[package]] @@ -832,9 +795,9 @@ version = "19.1.0" source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.1.0", - "datadog-trace-utils 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", "log", "percent-encoding", "regex", @@ -843,16 +806,6 @@ dependencies = [ "url", ] -[[package]] -name = "datadog-trace-protobuf" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "prost", - "serde", - "serde_bytes", -] - [[package]] name = "datadog-trace-protobuf" version = "19.1.0" @@ -863,34 +816,6 @@ dependencies = [ "serde_bytes", ] -[[package]] -name = "datadog-trace-utils" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=5610c65786cb7c7e24eab40b214251424a9ae3fb#5610c65786cb7c7e24eab40b214251424a9ae3fb" -dependencies = [ - "anyhow", - "bytes 1.10.1", - "datadog-trace-normalization 17.0.0", - "datadog-trace-protobuf 17.0.0", - "ddcommon 17.0.0", - "flate2", - "futures 0.3.31", - "http-body-util", - "hyper 1.6.0", - "hyper-http-proxy", - "prost", - "rand 0.8.5", - "rmp", - "rmp-serde", - "rmpv", - "serde", - "serde_json", - "tinybytes 17.0.0", - "tokio", - "tracing", - "zstd", -] - [[package]] name = "datadog-trace-utils" version = "19.1.0" @@ -898,9 +823,9 @@ source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b dependencies = [ "anyhow", "bytes 1.10.1", - "datadog-trace-normalization 19.1.0", - "datadog-trace-protobuf 19.1.0", - "ddcommon 19.1.0", + "datadog-trace-normalization", + "datadog-trace-protobuf", + "ddcommon", "flate2", "futures 0.3.31", "http-body-util", @@ -913,106 +838,12 @@ dependencies = [ "rmpv", "serde", "serde_json", - "tinybytes 19.1.0", + "tinybytes", "tokio", "tracing", "zstd", ] -[[package]] -name = "datadog-trace-utils" -version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" -dependencies = [ - "anyhow", - "bytes 1.10.1", - "datadog-trace-normalization 19.1.0", - "datadog-trace-protobuf 19.1.0", - "ddcommon 19.1.0", - "flate2", - "futures 0.3.31", - "http-body-util", - "hyper 1.6.0", - "hyper-http-proxy", - "prost", - "rand 0.8.5", - "rmp", - "rmp-serde", - "rmpv", - "serde", - "serde_json", - "tinybytes 19.1.0", - "tokio", - "tracing", - "zstd", -] - -[[package]] -name = "ddcommon" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=57fbbcfc1eb7f6a2ebc21748508ef17327461180#57fbbcfc1eb7f6a2ebc21748508ef17327461180" -dependencies = [ - "anyhow", - "cc", - "const_format", - "futures 0.3.31", - "futures-core", - "futures-util", - "hex", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-rustls", - "hyper-util", - "libc", - "nix 0.29.0", - "pin-project", - "regex", - "rustls", - "rustls-native-certs", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "tokio-rustls", - "tower-service", - "windows-sys 0.52.0", -] - -[[package]] -name = "ddcommon" -version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" -dependencies = [ - "anyhow", - "cc", - "const_format", - "futures 0.3.31", - "futures-core", - "futures-util", - "hex", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-rustls", - "hyper-util", - "libc", - "nix 0.29.0", - "pin-project", - "regex", - "rustls", - "rustls-native-certs", - "serde", - "static_assertions", - "thiserror 1.0.69", - "tokio", - "tokio-rustls", - "tower-service", - "windows-sys 0.52.0", -] - [[package]] name = "ddcommon" version = "19.1.0" @@ -1135,9 +966,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092#985120329d0ba96c1ec8d719cc38e1f7ce11a092" +source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=985120329d0ba96c1ec8d719cc38e1f7ce11a092)", + "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", @@ -3523,14 +3354,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinybytes" -version = "17.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae#d6a2da32c6b92d6865a7e7987c8a1df2203fb1ae" -dependencies = [ - "serde", -] - [[package]] name = "tinybytes" version = "19.1.0" diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 262057313..f26940d7c 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies] diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index dcc0eb0ab..349557ed3 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -70,9 +70,7 @@ impl Agent { self.cancel_token.clone() } - // TODO (Yiming): Fix this lint - #[allow(clippy::ref_option)] - fn parse_port(endpoint: &Option, default_port: u16) -> u16 { + fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 { if let Some(endpoint) = endpoint { let port = endpoint.split(':').nth(1); if let Some(port) = port { diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 6e2a24596..2b0d0a68e 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -110,9 +110,7 @@ async fn graceful_shutdown(tasks: Arc>>, shutdown_token: Cance /// If the LWA proxy lambda runtime API is not provided, the default Extension /// host and port will be used. /// -// TODO (Yiming): Fix this lint -#[allow(clippy::ref_option)] -fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: &Option) -> SocketAddr { +fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) -> SocketAddr { if let Some(socket_addr) = aws_lwa_proxy_lambda_runtime_api .and_then(|uri_str| lwa::get_lwa_proxy_socket_address(uri_str).ok()) { diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 6bfc60e43..57251e8aa 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -10,6 +10,7 @@ use tokio::sync::Mutex; use tracing::{debug, error}; use datadog_trace_utils::{ + config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; @@ -52,10 +53,14 @@ pub struct ServerlessTraceFlusher { #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { - fn new(aggregator: Arc>, config: Arc, api_key_factory: Arc) -> Self { + fn new( + aggregator: Arc>, + config: Arc, + api_key_factory: Arc, + ) -> Self { let mut additional_endpoints: Vec = Vec::new(); - let config_clone = Arc::clone(&config); + //let config_apm_additional_endpoints = config.apm_additional_endpoints.clone(); for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); @@ -73,7 +78,7 @@ impl TraceFlusher for ServerlessTraceFlusher { ServerlessTraceFlusher { aggregator, - config: config_clone, + config, api_key_factory, additional_endpoints, } @@ -152,14 +157,17 @@ impl TraceFlusher for ServerlessTraceFlusher { let mut tasks = Vec::with_capacity(coalesced_traces.len()); debug!("Flushing {} traces", coalesced_traces.len()); - for trace in coalesced_traces.iter() { - let trace_with_endpoint = match endpoint.clone() { - Some(additional_endpoint) => trace.with_endpoint(additional_endpoint), + for trace in &coalesced_traces { + let trace_with_endpoint = match endpoint { + Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), None => trace.clone(), }; let proxy = self.config.proxy_https.clone(); tasks.push(tokio::spawn(async move { - trace_with_endpoint.send_proxy(proxy.as_deref()).await.last_result + trace_with_endpoint + .send_proxy(proxy.as_deref()) + .await + .last_result })); } From 0ae15852fbf0dc90991a3dbf5cfcf5fa26532185 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 18 Jul 2025 14:52:10 -0400 Subject: [PATCH 10/12] update license 3rd party --- bottlecap/LICENSE-3rdparty.csv | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/LICENSE-3rdparty.csv b/bottlecap/LICENSE-3rdparty.csv index b42a0f3b2..b8205f160 100644 --- a/bottlecap/LICENSE-3rdparty.csv +++ b/bottlecap/LICENSE-3rdparty.csv @@ -108,7 +108,6 @@ log,https://github.com/rust-lang/log,MIT OR Apache-2.0,The Rust Project Develope matchers,https://github.com/hawkw/matchers,MIT,Eliza Weisman matchit,https://github.com/ibraheemdev/matchit,MIT AND BSD-3-Clause,Ibraheem Ahmed memchr,https://github.com/BurntSushi/memchr,Unlicense OR MIT,"Andrew Gallant , bluss" -memfd,https://github.com/lucab/memfd-rs,MIT OR Apache-2.0,"Luca Bruno , Simonas Kazlauskas " mime,https://github.com/hyperium/mime,MIT OR Apache-2.0,Sean McArthur miniz_oxide,https://github.com/Frommi/miniz_oxide/tree/master/miniz_oxide,MIT OR Zlib OR Apache-2.0,"Frommi , oyvindln , Rich Geldreich richgel99@gmail.com" mio,https://github.com/tokio-rs/mio,MIT,"Carl Lerche , Thomas de Zeeuw , Tokio Contributors " From d2ce4c01c394114fca57184afe250d67e1554377 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 18 Jul 2025 17:41:18 -0400 Subject: [PATCH 11/12] fixes --- bottlecap/src/lifecycle/invocation/processor.rs | 1 + bottlecap/src/traces/stats_flusher.rs | 3 ++- bottlecap/src/traces/trace_flusher.rs | 12 ++++++------ bottlecap/src/traces/trace_processor.rs | 3 ++- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 1f8c246b0..822d1f9cb 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -41,6 +41,7 @@ use crate::{ }; pub const MS_TO_NS: f64 = 1_000_000.0; +pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 266d9d465..71a86fe9d 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -8,6 +8,7 @@ use tokio::sync::Mutex; use tokio::sync::OnceCell; use crate::config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::stats_aggregator::StatsAggregator; use datadog_trace_protobuf::pb; use datadog_trace_utils::{config_utils::trace_stats_url, stats_utils}; @@ -75,7 +76,7 @@ impl StatsFlusher for ServerlessStatsFlusher { url: hyper::Uri::from_str(&stats_url) .expect("can't make URI from stats url, exiting"), api_key: Some(api_key_clone.into()), - timeout_ms: self.config.flush_timeout * 1_000, + timeout_ms: self.config.flush_timeout * S_TO_MS, test_token: None, } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 57251e8aa..74a4f487d 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -17,6 +17,7 @@ use datadog_trace_utils::{ use dogstatsd::api_key::ApiKeyFactory; use crate::config::Config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::trace_aggregator::TraceAggregator; #[async_trait] @@ -60,7 +61,6 @@ impl TraceFlusher for ServerlessTraceFlusher { ) -> Self { let mut additional_endpoints: Vec = Vec::new(); - //let config_apm_additional_endpoints = config.apm_additional_endpoints.clone(); for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); @@ -68,7 +68,7 @@ impl TraceFlusher for ServerlessTraceFlusher { url: hyper::Uri::from_str(&trace_intake_url) .expect("can't parse additional trace intake URL, exiting"), api_key: Some(api_key.clone().into()), - timeout_ms: config.flush_timeout * 1_000, + timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, }; @@ -136,11 +136,11 @@ impl TraceFlusher for ServerlessTraceFlusher { trace_builders = guard.get_batch(); } - if failed_batch.is_empty() { - None - } else { - Some(failed_batch) + if !failed_batch.is_empty() { + return Some(failed_batch); } + + None } async fn send( diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index ca65bb405..fba5d4781 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config; +use crate::lifecycle::invocation::processor::S_TO_MS; use crate::tags::provider; use crate::traces::span_pointers::{attach_span_pointers_to_meta, SpanPointer}; use crate::traces::{ @@ -167,7 +168,7 @@ impl TraceProcessor for ServerlessTraceProcessor { .expect("can't parse trace intake URL, exiting"), // Will be set at flush time api_key: None, - timeout_ms: config.flush_timeout * 1_000, + timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, }; From d37f43427df891dd4307f899ab128d0e66e66b5e Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 18 Jul 2025 17:54:50 -0400 Subject: [PATCH 12/12] update serverless-components dependency --- bottlecap/Cargo.lock | 6 +++--- bottlecap/Cargo.toml | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index a703562df..1ff2d4d81 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -724,7 +724,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ "reqwest", "rustls", @@ -761,7 +761,7 @@ dependencies = [ [[package]] name = "datadog-trace-agent" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ "anyhow", "async-trait", @@ -966,7 +966,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=5b8d3d5837a563a23264862df2222aac3e60c583#5b8d3d5837a563a23264862df2222aac3e60c583" +source = "git+https://github.com/DataDog/serverless-components?rev=d131de8419c191ce21c91bb30b5915c4d8a2cc5a#d131de8419c191ce21c91bb30b5915c4d8a2cc5a" dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index f26940d7c..0281b64d8 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -57,9 +57,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583", default-features = false } -datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583" } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "5b8d3d5837a563a23264862df2222aac3e60c583", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } +datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a" } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false } axum = { version = "0.8.4", default-features = false, features = ["default"] } [dev-dependencies]