From 9f4177d0560479af3e5e4e0b908e8afa9fd72213 Mon Sep 17 00:00:00 2001 From: Lewis Date: Wed, 24 Dec 2025 14:52:02 -0500 Subject: [PATCH 01/13] Add named pipe to tracer /info --- crates/datadog-trace-agent/src/config.rs | 24 +++++++++++++++++++ crates/datadog-trace-agent/src/mini_agent.rs | 22 +++++++++++++---- .../src/trace_processor.rs | 1 + 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 92793b6d..0f04821b 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -79,7 +79,10 @@ impl Tags { #[derive(Debug)] pub struct Config { pub dd_site: String, + pub dd_apm_receiver_port: u16, + pub dd_apm_windows_pipe_name: Option, pub dd_dogstatsd_port: u16, + pub dd_dogstatsd_windows_pipe_name: Option, pub env_type: trace_utils::EnvironmentType, pub app_name: Option, pub max_request_content_length: usize, @@ -119,6 +122,8 @@ impl Config { .ok() .and_then(|port| port.parse::().ok()) .unwrap_or(DEFAULT_DOGSTATSD_PORT); + let dd_dogstatsd_windows_pipe_name: Option = + env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME").ok(); let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); // construct the trace & trace stats intake urls based on DD_SITE env var (to flush traces & @@ -166,6 +171,7 @@ impl Config { proxy_request_retry_backoff_base_ms: 100, verify_env_timeout_ms: 100, dd_dogstatsd_port, + dd_dogstatsd_windows_pipe_name, dd_site, trace_intake: Endpoint { url: hyper::Uri::from_str(&trace_intake_url).unwrap(), @@ -345,6 +351,24 @@ mod tests { env::remove_var("DD_DOGSTATSD_PORT"); } + #[test] + #[serial] + fn test_dogstatsd_windows_pipe_name() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_DOGSTATSD_WINDOWS_PIPE_NAME", r"\\.\pipe\dogstatsd"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!( + config.dd_dogstatsd_windows_pipe_name, + Some(r"\\.\pipe\dogstatsd".to_string()) + ); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_DOGSTATSD_WINDOWS_PIPE_NAME"); + } + fn test_config_with_dd_tags(dd_tags: &str) -> config::Config { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 280d6a7c..f84bc6c7 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -219,7 +219,10 @@ impl MiniAgent { ), } } - (_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) { + (_, INFO_ENDPOINT_PATH) => match Self::info_handler( + config.dd_dogstatsd_port, + config.dd_dogstatsd_windows_pipe_name.as_deref(), + ) { Ok(res) => Ok(res), Err(err) => log_and_create_http_response( &format!("Info endpoint error: {err}"), @@ -287,7 +290,18 @@ impl MiniAgent { } } - fn info_handler(dd_dogstatsd_port: u16) -> http::Result { + fn info_handler( + dd_dogstatsd_port: u16, + dd_dogstatsd_windows_pipe_name: Option<&str>, + ) -> http::Result { + let mut config_json = serde_json::json!({ + "statsd_port": dd_dogstatsd_port + }); + + if let Some(pipe_name) = dd_dogstatsd_windows_pipe_name { + config_json["statsd_windows_pipe_name"] = serde_json::json!(pipe_name); + } + let response_json = json!( { "endpoints": [ @@ -297,9 +311,7 @@ impl MiniAgent { PROFILING_ENDPOINT_PATH ], "client_drop_p0s": true, - "config": { - "statsd_port": dd_dogstatsd_port - } + "config": config_json } ); Response::builder() diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 0e75b06f..f4ae6787 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -204,6 +204,7 @@ mod tests { }, dd_site: "datadoghq.com".to_string(), dd_dogstatsd_port: 8125, + dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::CloudFunction, os: "linux".to_string(), obfuscation_config: ObfuscationConfig::new().unwrap(), From 4ca18a8811e9a55a9e6a3ba93ee6d94eca4feaca Mon Sep 17 00:00:00 2001 From: Lewis Date: Wed, 24 Dec 2025 15:23:03 -0500 Subject: [PATCH 02/13] Add named pipe to tracer listener --- crates/datadog-trace-agent/src/config.rs | 63 ++++++++ crates/datadog-trace-agent/src/mini_agent.rs | 137 +++++++++++++++++- .../src/trace_processor.rs | 2 + 3 files changed, 196 insertions(+), 6 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 0f04821b..46f5fbb2 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -15,6 +15,7 @@ use libdd_trace_utils::config_utils::{ }; use libdd_trace_utils::trace_utils; +const DEFAULT_APM_RECEIVER_PORT: u16 = 8126; const DEFAULT_DOGSTATSD_PORT: u16 = 8125; #[derive(Debug)] @@ -118,6 +119,17 @@ impl Config { anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.") })?; + let dd_apm_windows_pipe_name: Option = + env::var("DD_APM_WINDOWS_PIPE_NAME").ok(); + let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() { + 0 // Override to 0 when using Windows named pipe + } else { + env::var("DD_APM_RECEIVER_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_APM_RECEIVER_PORT) + }; + let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT") .ok() .and_then(|port| port.parse::().ok()) @@ -170,6 +182,8 @@ impl Config { proxy_request_max_retries: 3, proxy_request_retry_backoff_base_ms: 100, verify_env_timeout_ms: 100, + dd_apm_receiver_port, + dd_apm_windows_pipe_name, dd_dogstatsd_port, dd_dogstatsd_windows_pipe_name, dd_site, @@ -369,6 +383,55 @@ mod tests { env::remove_var("DD_DOGSTATSD_WINDOWS_PIPE_NAME"); } + #[test] + #[serial] + fn test_apm_windows_pipe_name() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_APM_WINDOWS_PIPE_NAME", r"\\.\pipe\trace-agent"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!( + config.dd_apm_windows_pipe_name, + Some(r"\\.\pipe\trace-agent".to_string()) + ); + // Port should be overridden to 0 when pipe is set + assert_eq!(config.dd_apm_receiver_port, 0); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_APM_WINDOWS_PIPE_NAME"); + } + + #[test] + #[serial] + fn test_default_apm_receiver_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_apm_receiver_port, 8126); + assert_eq!(config.dd_apm_windows_pipe_name, None); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + } + + #[test] + #[serial] + fn test_custom_apm_receiver_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_APM_RECEIVER_PORT", "18126"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_apm_receiver_port, 18126); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_APM_RECEIVER_PORT"); + } + fn test_config_with_dd_tags(dd_tags: &str) -> config::Config { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index f84bc6c7..73964833 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -20,7 +20,6 @@ use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils; use libdd_trace_utils::trace_utils::SendData; -const MINI_AGENT_PORT: usize = 8126; const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces"; const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; const INFO_ENDPOINT_PATH: &str = "/info"; @@ -125,14 +124,54 @@ impl MiniAgent { ) }); - let addr = SocketAddr::from(([127, 0, 0, 1], MINI_AGENT_PORT as u16)); - let listener = tokio::net::TcpListener::bind(&addr).await?; - - debug!("Mini Agent started: listening on port {MINI_AGENT_PORT}"); + // Determine which transport to use based on configuration + if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + debug!("Mini Agent started: listening on named pipe {}", pipe_name); + } else { + debug!("Mini Agent started: listening on port {}", self.config.dd_apm_receiver_port); + } debug!( - "Time taken start the Mini Agent: {} ms", + "Time taken to start the Mini Agent: {} ms", now.elapsed().as_millis() ); + + if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + // Windows named pipe transport + #[cfg(windows)] + { + Self::serve_named_pipe(pipe_name, service).await?; + } + + #[cfg(not(windows))] + { + error!("Named pipes are only supported on Windows, cannot use pipe: {}", pipe_name); + return Err("Named pipes are only supported on Windows".into()); + } + } else { + // TCP transport + let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); + let listener = tokio::net::TcpListener::bind(&addr).await?; + + Self::serve_tcp(listener, service).await?; + } + + Ok(()) + } + + async fn serve_tcp( + listener: tokio::net::TcpListener, + service: S, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); loop { @@ -177,6 +216,81 @@ impl MiniAgent { } } + #[cfg(windows)] + async fn serve_named_pipe( + pipe_name: &str, + service: S, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { + use tokio::net::windows::named_pipe::ServerOptions; + + let server = hyper::server::conn::http1::Builder::new(); + let mut joinset = tokio::task::JoinSet::new(); + + loop { + // Create a new named pipe server instance for this connection + let pipe_server = ServerOptions::new() + .first_pipe_instance(false) + .create(pipe_name)?; + + let conn_result = tokio::select! { + connect_res = pipe_server.connect() => { + match connect_res { + Ok(()) => Ok(pipe_server), + Err(e) => Err(e), + } + }, + finished = async { + match joinset.join_next().await { + Some(finished) => finished, + None => std::future::pending().await, + } + } => match finished { + Err(e) if e.is_panic() => { + std::panic::resume_unwind(e.into_panic()); + }, + Ok(()) | Err(_) => continue, + }, + }; + + let conn = match conn_result { + Ok(pipe) => pipe, + Err(e) + if matches!( + e.kind(), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionRefused + ) => + { + continue; + } + Err(e) => { + error!("Named pipe server error: {e}"); + return Err(e.into()); + } + }; + + let conn = hyper_util::rt::TokioIo::new(conn); + let server = server.clone(); + let service = service.clone(); + joinset.spawn(async move { + if let Err(e) = server.serve_connection(conn, service).await { + error!("Connection error: {e}"); + } + }); + } + } + #[allow(clippy::too_many_arguments)] async fn trace_endpoint_handler( config: Arc, @@ -220,6 +334,8 @@ impl MiniAgent { } } (_, INFO_ENDPOINT_PATH) => match Self::info_handler( + config.dd_apm_receiver_port, + config.dd_apm_windows_pipe_name.as_deref(), config.dd_dogstatsd_port, config.dd_dogstatsd_windows_pipe_name.as_deref(), ) { @@ -291,13 +407,22 @@ impl MiniAgent { } fn info_handler( + dd_apm_receiver_port: u16, + dd_apm_windows_pipe_name: Option<&str>, dd_dogstatsd_port: u16, dd_dogstatsd_windows_pipe_name: Option<&str>, ) -> http::Result { let mut config_json = serde_json::json!({ + "apm_config": { + "receiver_port": dd_apm_receiver_port + }, "statsd_port": dd_dogstatsd_port }); + if let Some(pipe_name) = dd_apm_windows_pipe_name { + config_json["apm_config"]["receiver_windows_pipe_name"] = serde_json::json!(pipe_name); + } + if let Some(pipe_name) = dd_dogstatsd_windows_pipe_name { config_json["statsd_windows_pipe_name"] = serde_json::json!(pipe_name); } diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index f4ae6787..f9d796cc 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -203,6 +203,8 @@ mod tests { ..Default::default() }, dd_site: "datadoghq.com".to_string(), + dd_apm_receiver_port: 8126, + dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::CloudFunction, From 4daf3586b9b869332a40fd257201939214b77008 Mon Sep 17 00:00:00 2001 From: Lewis Date: Wed, 24 Dec 2025 15:50:40 -0500 Subject: [PATCH 03/13] Add named pipes to the tracer-agent --- crates/datadog-trace-agent/src/config.rs | 22 -- crates/datadog-trace-agent/src/mini_agent.rs | 201 ++++++++-- .../src/trace_processor.rs | 1 - .../tests/integration_test.rs | 361 ++++++++++++++++++ 4 files changed, 537 insertions(+), 48 deletions(-) create mode 100644 crates/datadog-trace-agent/tests/integration_test.rs diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 46f5fbb2..5b610f51 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -83,7 +83,6 @@ pub struct Config { pub dd_apm_receiver_port: u16, pub dd_apm_windows_pipe_name: Option, pub dd_dogstatsd_port: u16, - pub dd_dogstatsd_windows_pipe_name: Option, pub env_type: trace_utils::EnvironmentType, pub app_name: Option, pub max_request_content_length: usize, @@ -134,8 +133,6 @@ impl Config { .ok() .and_then(|port| port.parse::().ok()) .unwrap_or(DEFAULT_DOGSTATSD_PORT); - let dd_dogstatsd_windows_pipe_name: Option = - env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME").ok(); let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); // construct the trace & trace stats intake urls based on DD_SITE env var (to flush traces & @@ -185,7 +182,6 @@ impl Config { dd_apm_receiver_port, dd_apm_windows_pipe_name, dd_dogstatsd_port, - dd_dogstatsd_windows_pipe_name, dd_site, trace_intake: Endpoint { url: hyper::Uri::from_str(&trace_intake_url).unwrap(), @@ -365,24 +361,6 @@ mod tests { env::remove_var("DD_DOGSTATSD_PORT"); } - #[test] - #[serial] - fn test_dogstatsd_windows_pipe_name() { - env::set_var("DD_API_KEY", "_not_a_real_key_"); - env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); - env::set_var("DD_DOGSTATSD_WINDOWS_PIPE_NAME", r"\\.\pipe\dogstatsd"); - let config_res = config::Config::new(); - assert!(config_res.is_ok()); - let config = config_res.unwrap(); - assert_eq!( - config.dd_dogstatsd_windows_pipe_name, - Some(r"\\.\pipe\dogstatsd".to_string()) - ); - env::remove_var("DD_API_KEY"); - env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); - env::remove_var("DD_DOGSTATSD_WINDOWS_PIPE_NAME"); - } - #[test] #[serial] fn test_apm_windows_pipe_name() { diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 73964833..39297db1 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -67,7 +67,7 @@ impl MiniAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); - tokio::spawn(async move { + let trace_flusher_handle = tokio::spawn(async move { trace_flusher.start_trace_flusher(trace_rx).await; }); @@ -80,7 +80,7 @@ impl MiniAgent { // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); - tokio::spawn(async move { + let stats_flusher_handle = tokio::spawn(async move { stats_flusher .start_stats_flusher(stats_config, stats_rx) .await; @@ -139,7 +139,13 @@ impl MiniAgent { // Windows named pipe transport #[cfg(windows)] { - Self::serve_named_pipe(pipe_name, service).await?; + Self::serve_named_pipe( + pipe_name, + service, + trace_flusher_handle, + stats_flusher_handle, + ) + .await?; } #[cfg(not(windows))] @@ -152,7 +158,8 @@ impl MiniAgent { let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); let listener = tokio::net::TcpListener::bind(&addr).await?; - Self::serve_tcp(listener, service).await?; + Self::serve_tcp(listener, service, trace_flusher_handle, stats_flusher_handle) + .await?; } Ok(()) @@ -161,6 +168,8 @@ impl MiniAgent { async fn serve_tcp( listener: tokio::net::TcpListener, service: S, + mut trace_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_flusher_handle: tokio::task::JoinHandle<()>, ) -> Result<(), Box> where S: hyper::service::Service< @@ -172,8 +181,13 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { + use tokio::time::{sleep, Duration}; + let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); + let mut consecutive_errors = 0u32; + const MAX_CONSECUTIVE_ERRORS: u32 = 10; + loop { let conn = tokio::select! { con_res = listener.accept() => match con_res { @@ -185,13 +199,29 @@ impl MiniAgent { | io::ErrorKind::ConnectionRefused ) => { + // Transient connection errors - just continue + consecutive_errors = 0; // Reset on transient errors continue; } Err(e) => { - error!("Server error: {e}"); - return Err(e.into()); + // Log non-transient errors but don't immediately kill server + error!("TCP listener error: {e}"); + consecutive_errors += 1; + + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + error!("Too many consecutive listener errors, shutting down"); + return Err(e.into()); + } + + // Exponential backoff before retry + let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); + sleep(Duration::from_millis(backoff_ms)).await; + continue; + } + Ok((conn, _)) => { + consecutive_errors = 0; // Reset on success + conn } - Ok((conn, _)) => conn, }, finished = async { match joinset.join_next().await { @@ -200,10 +230,20 @@ impl MiniAgent { } } => match finished { Err(e) if e.is_panic() => { - std::panic::resume_unwind(e.into_panic()); + // Don't kill server on panic - log and continue + error!("Connection handler panicked: {:?}", e); + continue; }, Ok(()) | Err(_) => continue, }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, }; let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); @@ -220,6 +260,8 @@ impl MiniAgent { async fn serve_named_pipe( pipe_name: &str, service: S, + mut trace_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_flusher_handle: tokio::task::JoinHandle<()>, ) -> Result<(), Box> where S: hyper::service::Service< @@ -232,20 +274,47 @@ impl MiniAgent { S::Error: std::error::Error + Send + Sync + 'static, { use tokio::net::windows::named_pipe::ServerOptions; + use tokio::time::{sleep, Duration}; let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); + let mut consecutive_errors = 0u32; + const MAX_CONSECUTIVE_ERRORS: u32 = 10; - loop { - // Create a new named pipe server instance for this connection - let pipe_server = ServerOptions::new() + // Pre-create first pipe instance to minimize connection gap + let mut current_pipe = loop { + match ServerOptions::new() .first_pipe_instance(false) - .create(pipe_name)?; + .create(pipe_name) + { + Ok(pipe) => break pipe, + Err(e) => { + consecutive_errors += 1; + error!( + "Failed to create initial named pipe (attempt {}/{}): {}", + consecutive_errors, MAX_CONSECUTIVE_ERRORS, e + ); + + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + error!("Too many consecutive pipe creation failures during startup, shutting down"); + return Err(e.into()); + } + + // Exponential backoff: 10ms, 20ms, 40ms, 80ms, ... + let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); + sleep(Duration::from_millis(backoff_ms)).await; + } + } + }; + + // Reset error counter after successful creation + consecutive_errors = 0; + loop { let conn_result = tokio::select! { - connect_res = pipe_server.connect() => { + connect_res = current_pipe.connect() => { match connect_res { - Ok(()) => Ok(pipe_server), + Ok(()) => Ok(current_pipe), Err(e) => Err(e), } }, @@ -256,13 +325,23 @@ impl MiniAgent { } } => match finished { Err(e) if e.is_panic() => { - std::panic::resume_unwind(e.into_panic()); + // Don't kill server on panic - log and continue + error!("Connection handler panicked: {:?}", e); + continue; }, Ok(()) | Err(_) => continue, }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, }; - let conn = match conn_result { + let connected_pipe = match conn_result { Ok(pipe) => pipe, Err(e) if matches!( @@ -272,15 +351,90 @@ impl MiniAgent { | io::ErrorKind::ConnectionRefused ) => { + // Transient connection errors - recreate pipe and continue + current_pipe = match ServerOptions::new() + .first_pipe_instance(false) + .create(pipe_name) + { + Ok(pipe) => { + consecutive_errors = 0; + pipe + } + Err(e) => { + consecutive_errors += 1; + error!( + "Failed to recreate pipe after transient error (attempt {}/{}): {}", + consecutive_errors, MAX_CONSECUTIVE_ERRORS, e + ); + + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + return Err(e.into()); + } + + let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); + sleep(Duration::from_millis(backoff_ms)).await; + continue; + } + }; continue; } Err(e) => { - error!("Named pipe server error: {e}"); - return Err(e.into()); + // Log non-transient errors but don't immediately kill server + error!("Named pipe connection error: {e}"); + consecutive_errors += 1; + + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + error!("Too many consecutive connection errors, shutting down"); + return Err(e.into()); + } + + // Recreate pipe with backoff + let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); + sleep(Duration::from_millis(backoff_ms)).await; + + current_pipe = match ServerOptions::new() + .first_pipe_instance(false) + .create(pipe_name) + { + Ok(pipe) => pipe, + Err(e) => { + error!("Failed to recreate pipe after error: {}", e); + continue; + } + }; + continue; } }; - let conn = hyper_util::rt::TokioIo::new(conn); + // Connection successful! Immediately create next pipe instance + // to minimize gap where no pipe is available + let next_pipe = match ServerOptions::new() + .first_pipe_instance(false) + .create(pipe_name) + { + Ok(pipe) => { + consecutive_errors = 0; // Reset on success + pipe + } + Err(e) => { + consecutive_errors += 1; + error!( + "Failed to create next pipe instance (attempt {}/{}): {}", + consecutive_errors, MAX_CONSECUTIVE_ERRORS, e + ); + + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + return Err(e.into()); + } + + // Brief pause before retry + sleep(Duration::from_millis(100)).await; + continue; + } + }; + + // Spawn handler with connected_pipe (ownership moved to task) + let conn = hyper_util::rt::TokioIo::new(connected_pipe); let server = server.clone(); let service = service.clone(); joinset.spawn(async move { @@ -288,6 +442,9 @@ impl MiniAgent { error!("Connection error: {e}"); } }); + + // Use next_pipe for the next iteration + current_pipe = next_pipe; } } @@ -337,7 +494,6 @@ impl MiniAgent { config.dd_apm_receiver_port, config.dd_apm_windows_pipe_name.as_deref(), config.dd_dogstatsd_port, - config.dd_dogstatsd_windows_pipe_name.as_deref(), ) { Ok(res) => Ok(res), Err(err) => log_and_create_http_response( @@ -410,7 +566,6 @@ impl MiniAgent { dd_apm_receiver_port: u16, dd_apm_windows_pipe_name: Option<&str>, dd_dogstatsd_port: u16, - dd_dogstatsd_windows_pipe_name: Option<&str>, ) -> http::Result { let mut config_json = serde_json::json!({ "apm_config": { @@ -423,10 +578,6 @@ impl MiniAgent { config_json["apm_config"]["receiver_windows_pipe_name"] = serde_json::json!(pipe_name); } - if let Some(pipe_name) = dd_dogstatsd_windows_pipe_name { - config_json["statsd_windows_pipe_name"] = serde_json::json!(pipe_name); - } - let response_json = json!( { "endpoints": [ diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index f9d796cc..87550fc4 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -206,7 +206,6 @@ mod tests { dd_apm_receiver_port: 8126, dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, - dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::CloudFunction, os: "linux".to_string(), obfuscation_config: ObfuscationConfig::new().unwrap(), diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs new file mode 100644 index 00000000..29a57eae --- /dev/null +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -0,0 +1,361 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use datadog_trace_agent::{ + config::Config, + env_verifier::EnvVerifier, + mini_agent::MiniAgent, + stats_flusher::StatsFlusher, + stats_processor::StatsProcessor, + trace_flusher::TraceFlusher, + trace_processor::TraceProcessor, +}; +use datadog_trace_protobuf::pb; +use datadog_trace_utils::trace_utils::{MiniAgentMetadata, SendData}; +use ddcommon::hyper_migration; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver, Sender}; + +/// Mock trace processor for testing +struct MockTraceProcessor; + +#[async_trait::async_trait] +impl TraceProcessor for MockTraceProcessor { + async fn process_traces( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _trace_tx: Sender, + _mini_agent_metadata: Arc, + ) -> Result> { + // Return a simple 200 OK response + Ok(hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("OK"))?) + } +} + +/// Mock trace flusher for testing +struct MockTraceFlusher; + +#[async_trait::async_trait] +impl TraceFlusher for MockTraceFlusher { + async fn start_trace_flusher(&self, mut _trace_rx: Receiver) { + // Do nothing - just consume messages + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } +} + +/// Mock stats processor for testing +struct MockStatsProcessor; + +#[async_trait::async_trait] +impl StatsProcessor for MockStatsProcessor { + async fn process_stats( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _stats_tx: Sender, + ) -> Result> { + Ok(hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("OK"))?) + } +} + +/// Mock stats flusher for testing +struct MockStatsFlusher; + +#[async_trait::async_trait] +impl StatsFlusher for MockStatsFlusher { + async fn start_stats_flusher( + &self, + _config: Arc, + mut _stats_rx: Receiver, + ) { + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } +} + +/// Mock env verifier for testing +struct MockEnvVerifier; + +#[async_trait::async_trait] +impl EnvVerifier for MockEnvVerifier { + async fn verify_environment( + &self, + _timeout_ms: u64, + _env_type: &datadog_trace_utils::trace_utils::EnvironmentType, + _os: &str, + ) -> MiniAgentMetadata { + MiniAgentMetadata { + function_name: Some("test-function".to_string()), + ..Default::default() + } + } +} + +/// Create a test config with TCP transport +fn create_tcp_test_config(port: u16) -> Config { + Config { + dd_site: "datadoghq.com".to_string(), + dd_apm_receiver_port: port, + dd_apm_windows_pipe_name: None, + dd_dogstatsd_port: 8125, + env_type: datadog_trace_utils::trace_utils::EnvironmentType::Local, + app_name: Some("test-app".to_string()), + max_request_content_length: 10_000_000, + obfuscation_config: Default::default(), + os: std::env::consts::OS.to_string(), + tags: datadog_trace_agent::config::Tags::new(), + stats_flush_interval: 10, + trace_flush_interval: 5, + trace_intake: libdd_common::Endpoint::default(), + trace_stats_intake: libdd_common::Endpoint::default(), + verify_env_timeout: 1000, + proxy_url: None, + } +} + +#[cfg(test)] +#[tokio::test] +async fn test_trace_agent_tcp_accepts_connection() { + use tokio::time::{timeout, Duration}; + + let test_port = 18126; // Use a non-standard port for testing + let config = Arc::new(create_tcp_test_config(test_port)); + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(MockTraceProcessor), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent in the background + let agent_handle = tokio::spawn(async move { + mini_agent.start_mini_agent().await + }); + + // Give the server time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to connect via TCP + let connect_result = timeout( + Duration::from_secs(2), + tokio::net::TcpStream::connect(format!("127.0.0.1:{}", test_port)), + ) + .await; + + // Verify connection succeeds + assert!( + connect_result.is_ok(), + "Failed to connect to TCP server within timeout" + ); + assert!( + connect_result.unwrap().is_ok(), + "TCP connection failed" + ); + + // Clean up: the agent is running in the background and will be dropped + agent_handle.abort(); +} + +#[cfg(test)] +#[tokio::test] +async fn test_trace_agent_tcp_handles_http_request() { + use hyper::body::Body; + use hyper::Request; + use tokio::time::{timeout, Duration}; + + let test_port = 18127; + let config = Arc::new(create_tcp_test_config(test_port)); + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(MockTraceProcessor), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent in the background + let agent_handle = tokio::spawn(async move { + mini_agent.start_mini_agent().await + }); + + // Give the server time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Send an HTTP request to the /info endpoint + let client = hyper::Client::new(); + let uri = format!("http://127.0.0.1:{}/info", test_port); + + let request_result = timeout( + Duration::from_secs(2), + client.request(Request::builder() + .uri(uri) + .method("GET") + .body(Body::empty()) + .unwrap()), + ) + .await; + + // Verify request succeeds + assert!( + request_result.is_ok(), + "HTTP request timed out" + ); + + let response = request_result.unwrap(); + assert!( + response.is_ok(), + "HTTP request failed: {:?}", + response.err() + ); + + let response = response.unwrap(); + assert_eq!(response.status(), 200, "Expected 200 OK response"); + + // Clean up + agent_handle.abort(); +} + +#[cfg(all(test, windows))] +#[tokio::test] +async fn test_trace_agent_named_pipe_accepts_connection() { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::time::{timeout, Duration}; + + let pipe_name = r"\\.\pipe\dd_trace_test_pipe"; + let mut config = create_tcp_test_config(0); // Port 0 when using named pipe + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + let config = Arc::new(config); + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(MockTraceProcessor), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent in the background + let agent_handle = tokio::spawn(async move { + mini_agent.start_mini_agent().await + }); + + // Give the server time to create the pipe + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to connect via named pipe + let connect_result = timeout( + Duration::from_secs(2), + ClientOptions::new().open(pipe_name), + ) + .await; + + // Verify connection succeeds + assert!( + connect_result.is_ok(), + "Failed to connect to named pipe within timeout" + ); + assert!( + connect_result.unwrap().is_ok(), + "Named pipe connection failed" + ); + + // Clean up + agent_handle.abort(); +} + +#[cfg(all(test, windows))] +#[tokio::test] +async fn test_trace_agent_named_pipe_handles_http_request() { + use hyper::body::Body; + use hyper::Request; + use hyper_util::rt::TokioIo; + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::time::{timeout, Duration}; + + let pipe_name = r"\\.\pipe\dd_trace_test_pipe_http"; + let mut config = create_tcp_test_config(0); + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + let config = Arc::new(config); + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(MockTraceProcessor), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent in the background + let agent_handle = tokio::spawn(async move { + mini_agent.start_mini_agent().await + }); + + // Give the server time to create the pipe + tokio::time::sleep(Duration::from_millis(100)).await; + + // Connect to the named pipe + let client = ClientOptions::new() + .open(pipe_name) + .expect("Failed to connect to named pipe"); + + // Wrap the pipe in TokioIo for hyper + let io = TokioIo::new(client); + + // Send an HTTP request over the pipe + let (mut sender, conn) = hyper::client::conn::http1::handshake(io) + .await + .expect("Failed to perform HTTP handshake"); + + // Spawn connection task + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("Connection error: {}", e); + } + }); + + let request = Request::builder() + .uri("/info") + .method("GET") + .body(Body::empty()) + .unwrap(); + + let response_result = timeout(Duration::from_secs(2), sender.send_request(request)).await; + + // Verify request succeeds + assert!( + response_result.is_ok(), + "HTTP request over named pipe timed out" + ); + + let response = response_result.unwrap(); + assert!( + response.is_ok(), + "HTTP request over named pipe failed: {:?}", + response.err() + ); + + let response = response.unwrap(); + assert_eq!( + response.status(), + 200, + "Expected 200 OK response from named pipe" + ); + + // Clean up + agent_handle.abort(); +} \ No newline at end of file From 42a61509ce970f92449f5050250db8d6ecf25da2 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 10:17:29 -0500 Subject: [PATCH 04/13] Simplify trace agent named pipe and use of /info --- crates/datadog-trace-agent/src/config.rs | 6 --- crates/datadog-trace-agent/src/mini_agent.rs | 44 +++++-------------- .../src/trace_processor.rs | 2 +- 3 files changed, 11 insertions(+), 41 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 5b610f51..e101c76f 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -24,12 +24,6 @@ pub struct Tags { function_tags_string: OnceLock, } -impl Default for Tags { - fn default() -> Self { - Self::new() - } -} - impl Tags { pub fn from_env_string(env_tags: &str) -> Self { let mut tags = HashMap::new(); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 39297db1..74908266 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -15,6 +15,10 @@ use tracing::{debug, error}; use crate::http_utils::{log_and_create_http_response, verify_request_content_length}; use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; + +#[cfg(windows)] +use tokio::{net::windows::named_pipe::ServerOptions, time::{sleep, Duration}}; + use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils; @@ -181,12 +185,8 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { - use tokio::time::{sleep, Duration}; - let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); - let mut consecutive_errors = 0u32; - const MAX_CONSECUTIVE_ERRORS: u32 = 10; loop { let conn = tokio::select! { @@ -199,29 +199,13 @@ impl MiniAgent { | io::ErrorKind::ConnectionRefused ) => { - // Transient connection errors - just continue - consecutive_errors = 0; // Reset on transient errors continue; } Err(e) => { - // Log non-transient errors but don't immediately kill server - error!("TCP listener error: {e}"); - consecutive_errors += 1; - - if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { - error!("Too many consecutive listener errors, shutting down"); - return Err(e.into()); - } - - // Exponential backoff before retry - let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); - sleep(Duration::from_millis(backoff_ms)).await; - continue; - } - Ok((conn, _)) => { - consecutive_errors = 0; // Reset on success - conn + error!("Server error: {e}"); + return Err(e.into()); } + Ok((conn, _)) => conn, }, finished = async { match joinset.join_next().await { @@ -273,9 +257,6 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { - use tokio::net::windows::named_pipe::ServerOptions; - use tokio::time::{sleep, Duration}; - let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); let mut consecutive_errors = 0u32; @@ -568,16 +549,11 @@ impl MiniAgent { dd_dogstatsd_port: u16, ) -> http::Result { let mut config_json = serde_json::json!({ - "apm_config": { - "receiver_port": dd_apm_receiver_port - }, - "statsd_port": dd_dogstatsd_port + "receiver_port": dd_apm_receiver_port, + "statsd_port": dd_dogstatsd_port, + "receiver_socket": serde_json::json!(dd_apm_windows_pipe_name.unwrap_or("")) }); - if let Some(pipe_name) = dd_apm_windows_pipe_name { - config_json["apm_config"]["receiver_windows_pipe_name"] = serde_json::json!(pipe_name); - } - let response_json = json!( { "endpoints": [ diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 87550fc4..9536b368 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -104,7 +104,7 @@ impl TraceProcessor for ServerlessTraceProcessor { // double check content length is < max request content length in case transfer encoding is used if body_size > config.max_request_content_length { return log_and_create_http_response( - "Error processing traces: Payload too large", + &format!("Error processing traces: Payload too large"), StatusCode::PAYLOAD_TOO_LARGE, ); } From f1ba2e8681ea192ec2c1c19b466f45bb68aee515 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 10:57:39 -0500 Subject: [PATCH 05/13] Align named pipe and tcp for tracer behavior --- crates/datadog-trace-agent/src/mini_agent.rs | 179 ++++--------------- 1 file changed, 39 insertions(+), 140 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 74908266..691cb9fe 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -17,7 +17,7 @@ use crate::http_utils::{log_and_create_http_response, verify_request_content_len use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; #[cfg(windows)] -use tokio::{net::windows::named_pipe::ServerOptions, time::{sleep, Duration}}; +use tokio::net::windows::named_pipe::ServerOptions; use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; use libdd_trace_protobuf::pb; @@ -214,12 +214,11 @@ impl MiniAgent { } } => match finished { Err(e) if e.is_panic() => { - // Don't kill server on panic - log and continue - error!("Connection handler panicked: {:?}", e); - continue; + std::panic::resume_unwind(e.into_panic()); }, Ok(()) | Err(_) => continue, }, + // If there's some error in the background tasks, we can't send data result = &mut trace_flusher_handle => { error!("Trace flusher task died: {:?}", result); return Err("Trace flusher task terminated unexpectedly".into()); @@ -259,44 +258,40 @@ impl MiniAgent { { let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); - let mut consecutive_errors = 0u32; - const MAX_CONSECUTIVE_ERRORS: u32 = 10; - - // Pre-create first pipe instance to minimize connection gap - let mut current_pipe = loop { - match ServerOptions::new() - .first_pipe_instance(false) - .create(pipe_name) - { - Ok(pipe) => break pipe, - Err(e) => { - consecutive_errors += 1; - error!( - "Failed to create initial named pipe (attempt {}/{}): {}", - consecutive_errors, MAX_CONSECUTIVE_ERRORS, e - ); - - if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { - error!("Too many consecutive pipe creation failures during startup, shutting down"); - return Err(e.into()); - } - // Exponential backoff: 10ms, 20ms, 40ms, 80ms, ... - let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); - sleep(Duration::from_millis(backoff_ms)).await; + loop { + // Create a new pipe instance + let pipe = match ServerOptions::new().create(pipe_name) { + Ok(pipe) => { + debug!("Created pipe server instance '{}' in byte mode", pipe_name); + pipe } - } - }; - - // Reset error counter after successful creation - consecutive_errors = 0; + Err(e) => { + error!("Failed to create named pipe: {e}"); + return Err(e.into()); + } + }; - loop { - let conn_result = tokio::select! { - connect_res = current_pipe.connect() => { - match connect_res { - Ok(()) => Ok(current_pipe), - Err(e) => Err(e), + // Wait for client connection + let conn = tokio::select! { + connect_res = pipe.connect() => match connect_res { + Err(e) + if matches!( + e.kind(), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionRefused + ) => + { + continue; + } + Err(e) => { + error!("Named pipe connection error: {e}"); + return Err(e.into()); + } + Ok(()) => { + debug!("Client connected to '{}'", pipe_name); + pipe } }, finished = async { @@ -306,12 +301,11 @@ impl MiniAgent { } } => match finished { Err(e) if e.is_panic() => { - // Don't kill server on panic - log and continue - error!("Connection handler panicked: {:?}", e); - continue; + std::panic::resume_unwind(e.into_panic()); }, Ok(()) | Err(_) => continue, }, + // If there's some error in the background tasks, we can't send data result = &mut trace_flusher_handle => { error!("Trace flusher task died: {:?}", result); return Err("Trace flusher task terminated unexpectedly".into()); @@ -322,100 +316,8 @@ impl MiniAgent { }, }; - let connected_pipe = match conn_result { - Ok(pipe) => pipe, - Err(e) - if matches!( - e.kind(), - io::ErrorKind::ConnectionAborted - | io::ErrorKind::ConnectionReset - | io::ErrorKind::ConnectionRefused - ) => - { - // Transient connection errors - recreate pipe and continue - current_pipe = match ServerOptions::new() - .first_pipe_instance(false) - .create(pipe_name) - { - Ok(pipe) => { - consecutive_errors = 0; - pipe - } - Err(e) => { - consecutive_errors += 1; - error!( - "Failed to recreate pipe after transient error (attempt {}/{}): {}", - consecutive_errors, MAX_CONSECUTIVE_ERRORS, e - ); - - if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { - return Err(e.into()); - } - - let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); - sleep(Duration::from_millis(backoff_ms)).await; - continue; - } - }; - continue; - } - Err(e) => { - // Log non-transient errors but don't immediately kill server - error!("Named pipe connection error: {e}"); - consecutive_errors += 1; - - if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { - error!("Too many consecutive connection errors, shutting down"); - return Err(e.into()); - } - - // Recreate pipe with backoff - let backoff_ms = 10u64 * (1 << consecutive_errors.min(6)); - sleep(Duration::from_millis(backoff_ms)).await; - - current_pipe = match ServerOptions::new() - .first_pipe_instance(false) - .create(pipe_name) - { - Ok(pipe) => pipe, - Err(e) => { - error!("Failed to recreate pipe after error: {}", e); - continue; - } - }; - continue; - } - }; - - // Connection successful! Immediately create next pipe instance - // to minimize gap where no pipe is available - let next_pipe = match ServerOptions::new() - .first_pipe_instance(false) - .create(pipe_name) - { - Ok(pipe) => { - consecutive_errors = 0; // Reset on success - pipe - } - Err(e) => { - consecutive_errors += 1; - error!( - "Failed to create next pipe instance (attempt {}/{}): {}", - consecutive_errors, MAX_CONSECUTIVE_ERRORS, e - ); - - if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { - return Err(e.into()); - } - - // Brief pause before retry - sleep(Duration::from_millis(100)).await; - continue; - } - }; - - // Spawn handler with connected_pipe (ownership moved to task) - let conn = hyper_util::rt::TokioIo::new(connected_pipe); + // Hyper http parser handles buffering pipe data + let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); let service = service.clone(); joinset.spawn(async move { @@ -423,9 +325,6 @@ impl MiniAgent { error!("Connection error: {e}"); } }); - - // Use next_pipe for the next iteration - current_pipe = next_pipe; } } @@ -548,7 +447,7 @@ impl MiniAgent { dd_apm_windows_pipe_name: Option<&str>, dd_dogstatsd_port: u16, ) -> http::Result { - let mut config_json = serde_json::json!({ + let config_json = serde_json::json!({ "receiver_port": dd_apm_receiver_port, "statsd_port": dd_dogstatsd_port, "receiver_socket": serde_json::json!(dd_apm_windows_pipe_name.unwrap_or("")) From 7551b3b91c06a04b259862e58213ce5e75f3c291 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 14:59:22 -0500 Subject: [PATCH 06/13] Add integration tests for datagod-trace-agent miniagent connections --- .../tests/common/helpers.rs | 97 +++++ .../datadog-trace-agent/tests/common/mocks.rs | 118 ++++++ .../datadog-trace-agent/tests/common/mod.rs | 7 + .../tests/integration_test.rs | 357 ++++-------------- 4 files changed, 290 insertions(+), 289 deletions(-) create mode 100644 crates/datadog-trace-agent/tests/common/helpers.rs create mode 100644 crates/datadog-trace-agent/tests/common/mocks.rs create mode 100644 crates/datadog-trace-agent/tests/common/mod.rs diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs new file mode 100644 index 00000000..50c6736b --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -0,0 +1,97 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Helper functions for integration tests + +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use libdd_trace_utils::test_utils::create_test_json_span; +use std::time::{Duration, UNIX_EPOCH}; +use tokio::time::timeout; + +/// Create a simple test trace payload as msgpack bytes +pub fn create_test_trace_payload() -> Vec { + let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64; + let json_span = create_test_json_span(11, 222, 0, start, false); + rmp_serde::to_vec(&vec![vec![json_span]]).expect("Failed to serialize test trace") +} + +/// Send an HTTP request over TCP and return the response +pub async fn send_tcp_request( + port: u16, + uri: &str, + method: &str, + body: Option>, +) -> Result, Box> { + let stream = timeout( + Duration::from_secs(2), + tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port)), + ) + .await??; + + let io = TokioIo::new(stream); + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + tokio::spawn(async move { + let _ = conn.await; + }); + + let mut request_builder = Request::builder() + .uri(uri) + .method(method) + .header("Content-Type", "application/msgpack"); + + let response = if let Some(body_data) = body { + let body_len = body_data.len(); + request_builder = request_builder.header("Content-Length", body_len.to_string()); + let request = request_builder.body(hyper_migration::Body::from(body_data))?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + } else { + let request = request_builder.body(hyper_migration::Body::empty())?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + }; + + Ok(response) +} + +#[cfg(windows)] +/// Send an HTTP request over named pipe and return the response +pub async fn send_named_pipe_request( + pipe_name: &str, + uri: &str, + method: &str, + body: Option>, +) -> Result, Box> { + use tokio::net::windows::named_pipe::ClientOptions; + + let client = timeout( + Duration::from_secs(2), + ClientOptions::new().open(pipe_name), + ) + .await??; + + let io = TokioIo::new(client); + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + tokio::spawn(async move { + let _ = conn.await; + }); + + let mut request_builder = Request::builder() + .uri(uri) + .method(method) + .header("Content-Type", "application/msgpack"); + + let response = if let Some(body_data) = body { + let body_len = body_data.len(); + request_builder = request_builder.header("Content-Length", body_len.to_string()); + let request = request_builder.body(hyper_migration::Body::from(body_data))?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + } else { + let request = request_builder.body(hyper_migration::Body::empty())?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + }; + + Ok(response) +} diff --git a/crates/datadog-trace-agent/tests/common/mocks.rs b/crates/datadog-trace-agent/tests/common/mocks.rs new file mode 100644 index 00000000..be2dbf60 --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mocks.rs @@ -0,0 +1,118 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Mock implementations of trace agent components for testing + +use datadog_trace_agent::{ + config::Config, + env_verifier::EnvVerifier, + stats_flusher::StatsFlusher, + stats_processor::StatsProcessor, + trace_flusher::TraceFlusher, + trace_processor::TraceProcessor, +}; +use libdd_common::hyper_migration; +use libdd_trace_protobuf::pb; +use libdd_trace_utils::trace_utils::{self, MiniAgentMetadata, SendData}; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver, Sender}; + +/// Mock trace processor that returns 200 OK for all requests +#[allow(dead_code)] +pub struct MockTraceProcessor; + +#[async_trait::async_trait] +impl TraceProcessor for MockTraceProcessor { + async fn process_traces( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _trace_tx: Sender, + _mini_agent_metadata: Arc, + ) -> Result { + hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("{}")) + } +} + +/// Mock trace flusher that consumes messages without processing +pub struct MockTraceFlusher; + +#[async_trait::async_trait] +impl TraceFlusher for MockTraceFlusher { + fn new( + _aggregator: Arc>, + _config: Arc, + ) -> Self { + MockTraceFlusher + } + + async fn start_trace_flusher(&self, mut trace_rx: Receiver) { + // Consume messages from the channel without processing them + while let Some(_trace) = trace_rx.recv().await { + // Just discard the trace - we're not testing the flusher + } + } + + async fn send(&self, _traces: Vec) -> Option> { + None + } + + async fn flush(&self, _failed_traces: Option>) -> Option> { + None + } +} + +/// Mock stats processor that returns 200 OK for all requests +pub struct MockStatsProcessor; + +#[async_trait::async_trait] +impl StatsProcessor for MockStatsProcessor { + async fn process_stats( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _stats_tx: Sender, + ) -> Result { + hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("{}")) + } +} + +/// Mock stats flusher that consumes messages without processing +pub struct MockStatsFlusher; + +#[async_trait::async_trait] +impl StatsFlusher for MockStatsFlusher { + async fn start_stats_flusher( + &self, + _config: Arc, + mut stats_rx: Receiver, + ) { + // Consume messages from the channel without processing them + while let Some(_stats) = stats_rx.recv().await { + // Just discard the stats - we're not testing the flusher + } + } + + async fn flush_stats(&self, _config: Arc, _traces: Vec) { + // Do nothing + } +} + +/// Mock environment verifier that returns default metadata +pub struct MockEnvVerifier; + +#[async_trait::async_trait] +impl EnvVerifier for MockEnvVerifier { + async fn verify_environment( + &self, + _timeout_ms: u64, + _env_type: &trace_utils::EnvironmentType, + _os: &str, + ) -> MiniAgentMetadata { + MiniAgentMetadata::default() + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs new file mode 100644 index 00000000..3d83bbdb --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -0,0 +1,7 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Common test utilities, mocks, and helpers for integration tests + +pub mod helpers; +pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 29a57eae..170d9a6c 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -1,115 +1,35 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +mod common; + +use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mocks::{ + MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher, +}; use datadog_trace_agent::{ - config::Config, - env_verifier::EnvVerifier, - mini_agent::MiniAgent, - stats_flusher::StatsFlusher, - stats_processor::StatsProcessor, - trace_flusher::TraceFlusher, - trace_processor::TraceProcessor, + config::Config, mini_agent::MiniAgent, trace_processor::ServerlessTraceProcessor, }; -use datadog_trace_protobuf::pb; -use datadog_trace_utils::trace_utils::{MiniAgentMetadata, SendData}; -use ddcommon::hyper_migration; +use hyper::StatusCode; +use libdd_trace_utils::trace_utils; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; - -/// Mock trace processor for testing -struct MockTraceProcessor; - -#[async_trait::async_trait] -impl TraceProcessor for MockTraceProcessor { - async fn process_traces( - &self, - _config: Arc, - _req: hyper_migration::HttpRequest, - _trace_tx: Sender, - _mini_agent_metadata: Arc, - ) -> Result> { - // Return a simple 200 OK response - Ok(hyper::Response::builder() - .status(200) - .body(hyper_migration::Body::from("OK"))?) - } -} +use std::time::Duration; -/// Mock trace flusher for testing -struct MockTraceFlusher; - -#[async_trait::async_trait] -impl TraceFlusher for MockTraceFlusher { - async fn start_trace_flusher(&self, mut _trace_rx: Receiver) { - // Do nothing - just consume messages - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - } -} - -/// Mock stats processor for testing -struct MockStatsProcessor; - -#[async_trait::async_trait] -impl StatsProcessor for MockStatsProcessor { - async fn process_stats( - &self, - _config: Arc, - _req: hyper_migration::HttpRequest, - _stats_tx: Sender, - ) -> Result> { - Ok(hyper::Response::builder() - .status(200) - .body(hyper_migration::Body::from("OK"))?) - } -} - -/// Mock stats flusher for testing -struct MockStatsFlusher; - -#[async_trait::async_trait] -impl StatsFlusher for MockStatsFlusher { - async fn start_stats_flusher( - &self, - _config: Arc, - mut _stats_rx: Receiver, - ) { - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - } -} - -/// Mock env verifier for testing -struct MockEnvVerifier; - -#[async_trait::async_trait] -impl EnvVerifier for MockEnvVerifier { - async fn verify_environment( - &self, - _timeout_ms: u64, - _env_type: &datadog_trace_utils::trace_utils::EnvironmentType, - _os: &str, - ) -> MiniAgentMetadata { - MiniAgentMetadata { - function_name: Some("test-function".to_string()), - ..Default::default() - } - } -} +#[cfg(windows)] +use common::helpers::send_named_pipe_request; /// Create a test config with TCP transport -fn create_tcp_test_config(port: u16) -> Config { +pub fn create_tcp_test_config() -> Config { Config { - dd_site: "datadoghq.com".to_string(), - dd_apm_receiver_port: port, + dd_site: "mock-datadoghq.com".to_string(), + dd_apm_receiver_port: 8126, dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, - env_type: datadog_trace_utils::trace_utils::EnvironmentType::Local, + env_type: trace_utils::EnvironmentType::AzureFunction, app_name: Some("test-app".to_string()), max_request_content_length: 10_000_000, - obfuscation_config: Default::default(), + obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() + .unwrap(), os: std::env::consts::OS.to_string(), tags: datadog_trace_agent::config::Tags::new(), stats_flush_interval: 10, @@ -123,154 +43,45 @@ fn create_tcp_test_config(port: u16) -> Config { #[cfg(test)] #[tokio::test] -async fn test_trace_agent_tcp_accepts_connection() { - use tokio::time::{timeout, Duration}; - - let test_port = 18126; // Use a non-standard port for testing - let config = Arc::new(create_tcp_test_config(test_port)); - - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(MockTraceProcessor), - trace_flusher: Arc::new(MockTraceFlusher), - stats_processor: Arc::new(MockStatsProcessor), - stats_flusher: Arc::new(MockStatsFlusher), - env_verifier: Arc::new(MockEnvVerifier), - }; - - // Start the mini agent in the background - let agent_handle = tokio::spawn(async move { - mini_agent.start_mini_agent().await - }); - - // Give the server time to start - tokio::time::sleep(Duration::from_millis(100)).await; - - // Try to connect via TCP - let connect_result = timeout( - Duration::from_secs(2), - tokio::net::TcpStream::connect(format!("127.0.0.1:{}", test_port)), - ) - .await; - - // Verify connection succeeds - assert!( - connect_result.is_ok(), - "Failed to connect to TCP server within timeout" - ); - assert!( - connect_result.unwrap().is_ok(), - "TCP connection failed" - ); - - // Clean up: the agent is running in the background and will be dropped - agent_handle.abort(); -} - -#[cfg(test)] -#[tokio::test] -async fn test_trace_agent_tcp_handles_http_request() { - use hyper::body::Body; - use hyper::Request; - use tokio::time::{timeout, Duration}; - - let test_port = 18127; - let config = Arc::new(create_tcp_test_config(test_port)); - +async fn test_mini_agent_tcp_handles_requests() { + let config = Arc::new(create_tcp_test_config()); + let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(MockTraceProcessor), + config, + trace_processor: Arc::new(ServerlessTraceProcessor {}), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), env_verifier: Arc::new(MockEnvVerifier), }; - // Start the mini agent in the background + // Start the mini agent let agent_handle = tokio::spawn(async move { - mini_agent.start_mini_agent().await + let _ = mini_agent.start_mini_agent().await; }); - // Give the server time to start + // Give server time to start tokio::time::sleep(Duration::from_millis(100)).await; - // Send an HTTP request to the /info endpoint - let client = hyper::Client::new(); - let uri = format!("http://127.0.0.1:{}/info", test_port); - - let request_result = timeout( - Duration::from_secs(2), - client.request(Request::builder() - .uri(uri) - .method("GET") - .body(Body::empty()) - .unwrap()), - ) - .await; - - // Verify request succeeds - assert!( - request_result.is_ok(), - "HTTP request timed out" - ); - - let response = request_result.unwrap(); - assert!( - response.is_ok(), - "HTTP request failed: {:?}", - response.err() + // Test /info endpoint + let info_response = send_tcp_request(test_port, "/info", "GET", None) + .await + .expect("Failed to send /info request"); + assert_eq!( + info_response.status(), + StatusCode::OK, + "Expected 200 OK from /info endpoint" ); - let response = response.unwrap(); - assert_eq!(response.status(), 200, "Expected 200 OK response"); - - // Clean up - agent_handle.abort(); -} - -#[cfg(all(test, windows))] -#[tokio::test] -async fn test_trace_agent_named_pipe_accepts_connection() { - use tokio::net::windows::named_pipe::ClientOptions; - use tokio::time::{timeout, Duration}; - - let pipe_name = r"\\.\pipe\dd_trace_test_pipe"; - let mut config = create_tcp_test_config(0); // Port 0 when using named pipe - config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); - let config = Arc::new(config); - - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(MockTraceProcessor), - trace_flusher: Arc::new(MockTraceFlusher), - stats_processor: Arc::new(MockStatsProcessor), - stats_flusher: Arc::new(MockStatsFlusher), - env_verifier: Arc::new(MockEnvVerifier), - }; - - // Start the mini agent in the background - let agent_handle = tokio::spawn(async move { - mini_agent.start_mini_agent().await - }); - - // Give the server time to create the pipe - tokio::time::sleep(Duration::from_millis(100)).await; - - // Try to connect via named pipe - let connect_result = timeout( - Duration::from_secs(2), - ClientOptions::new().open(pipe_name), - ) - .await; - - // Verify connection succeeds - assert!( - connect_result.is_ok(), - "Failed to connect to named pipe within timeout" - ); - assert!( - connect_result.unwrap().is_ok(), - "Named pipe connection failed" + // Test /v0.4/traces endpoint with real trace data + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint" ); // Clean up @@ -279,83 +90,51 @@ async fn test_trace_agent_named_pipe_accepts_connection() { #[cfg(all(test, windows))] #[tokio::test] -async fn test_trace_agent_named_pipe_handles_http_request() { - use hyper::body::Body; - use hyper::Request; - use hyper_util::rt::TokioIo; - use tokio::net::windows::named_pipe::ClientOptions; - use tokio::time::{timeout, Duration}; - - let pipe_name = r"\\.\pipe\dd_trace_test_pipe_http"; - let mut config = create_tcp_test_config(0); +async fn test_mini_agent_named_pipe_handles_requests() { + let pipe_name = r"\\.\pipe\dd_trace_integration_test"; + let mut config = create_tcp_test_config(); config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_receiver_port = 0; let config = Arc::new(config); let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(MockTraceProcessor), + config, + trace_processor: Arc::new(ServerlessTraceProcessor {}), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), env_verifier: Arc::new(MockEnvVerifier), }; - // Start the mini agent in the background + // Start the mini agent let agent_handle = tokio::spawn(async move { - mini_agent.start_mini_agent().await + let _ = mini_agent.start_mini_agent().await; }); - // Give the server time to create the pipe + // Give server time to create pipe tokio::time::sleep(Duration::from_millis(100)).await; - // Connect to the named pipe - let client = ClientOptions::new() - .open(pipe_name) - .expect("Failed to connect to named pipe"); - - // Wrap the pipe in TokioIo for hyper - let io = TokioIo::new(client); - - // Send an HTTP request over the pipe - let (mut sender, conn) = hyper::client::conn::http1::handshake(io) + // Test /info endpoint + let info_response = send_named_pipe_request(pipe_name, "/info", "GET", None) .await - .expect("Failed to perform HTTP handshake"); - - // Spawn connection task - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("Connection error: {}", e); - } - }); - - let request = Request::builder() - .uri("/info") - .method("GET") - .body(Body::empty()) - .unwrap(); - - let response_result = timeout(Duration::from_secs(2), sender.send_request(request)).await; - - // Verify request succeeds - assert!( - response_result.is_ok(), - "HTTP request over named pipe timed out" - ); - - let response = response_result.unwrap(); - assert!( - response.is_ok(), - "HTTP request over named pipe failed: {:?}", - response.err() + .expect("Failed to send /info request over named pipe"); + assert_eq!( + info_response.status(), + StatusCode::OK, + "Expected 200 OK from /info endpoint over named pipe" ); - let response = response.unwrap(); + // Test /v0.4/traces endpoint with real trace data + let trace_payload = create_test_trace_payload(); + let trace_response = send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); assert_eq!( - response.status(), - 200, - "Expected 200 OK response from named pipe" + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint over named pipe" ); // Clean up agent_handle.abort(); -} \ No newline at end of file +} From b6269e759c5b04848ae5cae12d45b385d9fc3a8f Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:25:25 -0500 Subject: [PATCH 07/13] Fix cargo fmt --- crates/datadog-trace-agent/src/config.rs | 3 +-- crates/datadog-trace-agent/src/mini_agent.rs | 19 +++++++++++++++---- .../src/trace_processor.rs | 2 +- .../tests/common/helpers.rs | 6 +----- .../datadog-trace-agent/tests/common/mocks.rs | 8 ++------ .../tests/integration_test.rs | 11 +++++------ 6 files changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index e101c76f..f41be031 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -112,8 +112,7 @@ impl Config { anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.") })?; - let dd_apm_windows_pipe_name: Option = - env::var("DD_APM_WINDOWS_PIPE_NAME").ok(); + let dd_apm_windows_pipe_name: Option = env::var("DD_APM_WINDOWS_PIPE_NAME").ok(); let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() { 0 // Override to 0 when using Windows named pipe } else { diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 691cb9fe..f755ef6f 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -132,7 +132,10 @@ impl MiniAgent { if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { debug!("Mini Agent started: listening on named pipe {}", pipe_name); } else { - debug!("Mini Agent started: listening on port {}", self.config.dd_apm_receiver_port); + debug!( + "Mini Agent started: listening on port {}", + self.config.dd_apm_receiver_port + ); } debug!( "Time taken to start the Mini Agent: {} ms", @@ -154,7 +157,10 @@ impl MiniAgent { #[cfg(not(windows))] { - error!("Named pipes are only supported on Windows, cannot use pipe: {}", pipe_name); + error!( + "Named pipes are only supported on Windows, cannot use pipe: {}", + pipe_name + ); return Err("Named pipes are only supported on Windows".into()); } } else { @@ -162,8 +168,13 @@ impl MiniAgent { let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); let listener = tokio::net::TcpListener::bind(&addr).await?; - Self::serve_tcp(listener, service, trace_flusher_handle, stats_flusher_handle) - .await?; + Self::serve_tcp( + listener, + service, + trace_flusher_handle, + stats_flusher_handle, + ) + .await?; } Ok(()) diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 9536b368..fb30809e 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -104,7 +104,7 @@ impl TraceProcessor for ServerlessTraceProcessor { // double check content length is < max request content length in case transfer encoding is used if body_size > config.max_request_content_length { return log_and_create_http_response( - &format!("Error processing traces: Payload too large"), + &format!("Error processing traces: Payload too large"), StatusCode::PAYLOAD_TOO_LARGE, ); } diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs index 50c6736b..d749fce0 100644 --- a/crates/datadog-trace-agent/tests/common/helpers.rs +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -65,11 +65,7 @@ pub async fn send_named_pipe_request( ) -> Result, Box> { use tokio::net::windows::named_pipe::ClientOptions; - let client = timeout( - Duration::from_secs(2), - ClientOptions::new().open(pipe_name), - ) - .await??; + let client = timeout(Duration::from_secs(2), ClientOptions::new().open(pipe_name)).await??; let io = TokioIo::new(client); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; diff --git a/crates/datadog-trace-agent/tests/common/mocks.rs b/crates/datadog-trace-agent/tests/common/mocks.rs index be2dbf60..4e59e646 100644 --- a/crates/datadog-trace-agent/tests/common/mocks.rs +++ b/crates/datadog-trace-agent/tests/common/mocks.rs @@ -4,12 +4,8 @@ //! Mock implementations of trace agent components for testing use datadog_trace_agent::{ - config::Config, - env_verifier::EnvVerifier, - stats_flusher::StatsFlusher, - stats_processor::StatsProcessor, - trace_flusher::TraceFlusher, - trace_processor::TraceProcessor, + config::Config, env_verifier::EnvVerifier, stats_flusher::StatsFlusher, + stats_processor::StatsProcessor, trace_flusher::TraceFlusher, trace_processor::TraceProcessor, }; use libdd_common::hyper_migration; use libdd_trace_protobuf::pb; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 170d9a6c..9ede2832 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -4,9 +4,7 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; -use common::mocks::{ - MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher, -}; +use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, trace_processor::ServerlessTraceProcessor, }; @@ -126,9 +124,10 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Test /v0.4/traces endpoint with real trace data let trace_payload = create_test_trace_payload(); - let trace_response = send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) - .await - .expect("Failed to send /v0.4/traces request over named pipe"); + let trace_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); assert_eq!( trace_response.status(), StatusCode::OK, From f670cc57dcb0d3d587a4c0151ad8ac0bfaec8d0a Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:48:15 -0500 Subject: [PATCH 08/13] Fix test errors --- crates/datadog-trace-agent/tests/common/helpers.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs index d749fce0..a41cf055 100644 --- a/crates/datadog-trace-agent/tests/common/helpers.rs +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -65,7 +65,8 @@ pub async fn send_named_pipe_request( ) -> Result, Box> { use tokio::net::windows::named_pipe::ClientOptions; - let client = timeout(Duration::from_secs(2), ClientOptions::new().open(pipe_name)).await??; + // Note: open() is synchronous on Windows, not async + let client = ClientOptions::new().open(pipe_name)?; let io = TokioIo::new(client); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; From 6e15dd4eab67842a90b9f83a304f7ca1931afda9 Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 27 Jan 2026 12:39:11 -0500 Subject: [PATCH 09/13] Add /info content test --- .../tests/integration_test.rs | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 9ede2832..4f2476c4 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -8,8 +8,10 @@ use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockT use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, trace_processor::ServerlessTraceProcessor, }; +use http_body_util::BodyExt; use hyper::StatusCode; use libdd_trace_utils::trace_utils; +use serde_json::Value; use std::sync::Arc; use std::time::Duration; @@ -71,6 +73,48 @@ async fn test_mini_agent_tcp_handles_requests() { "Expected 200 OK from /info endpoint" ); + // Verify /info endpoint response content + let body = info_response + .into_body() + .collect() + .await + .expect("Failed to read /info response body") + .to_bytes(); + let json: Value = serde_json::from_slice(&body) + .expect("Failed to parse /info response as JSON"); + + // Check endpoints array + assert_eq!( + json["endpoints"], + serde_json::json!(["/v0.4/traces", "/v0.6/stats", "/info"]), + "Expected endpoints array" + ); + + // Check client_drop_p0s flag + assert_eq!( + json["client_drop_p0s"], + true, + "Expected client_drop_p0s to be true" + ); + + // Check config object + let config = &json["config"]; + assert_eq!( + config["receiver_port"], + test_port, + "Expected receiver_port to match test port" + ); + assert_eq!( + config["statsd_port"], + 8125, + "Expected statsd_port to be 8125" + ); + assert_eq!( + config["receiver_socket"], + "", + "Expected empty receiver_socket for TCP" + ); + // Test /v0.4/traces endpoint with real trace data let trace_payload = create_test_trace_payload(); let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) @@ -122,6 +166,34 @@ async fn test_mini_agent_named_pipe_handles_requests() { "Expected 200 OK from /info endpoint over named pipe" ); + // Verify /info endpoint response content + let body = info_response + .into_body() + .collect() + .await + .expect("Failed to read /info response body") + .to_bytes(); + let json: Value = serde_json::from_slice(&body) + .expect("Failed to parse /info response as JSON"); + + // Check config object specific to named pipe + let config_value = &json["config"]; + assert_eq!( + config_value["receiver_port"], + 0, + "Expected receiver_port to be 0 for named pipe" + ); + assert_eq!( + config_value["statsd_port"], + 8125, + "Expected statsd_port to be 8125" + ); + assert_eq!( + config_value["receiver_socket"], + pipe_name, + "Expected receiver_socket to match pipe name" + ); + // Test /v0.4/traces endpoint with real trace data let trace_payload = create_test_trace_payload(); let trace_response = From a460f25dea648f8db58e18d7604b190c6b52ba3b Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 27 Jan 2026 12:49:10 -0500 Subject: [PATCH 10/13] Match datadog-agent behavior by prepending \.\pipe" --- crates/datadog-trace-agent/src/mini_agent.rs | 23 +++++++++-- .../tests/integration_test.rs | 39 ++++++++----------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index f755ef6f..644b87f9 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -32,6 +32,9 @@ const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; +#[cfg(windows)] +const PIPE_NAME_PREFIX: &str = r"\\.\pipe\"; + pub struct MiniAgent { pub config: Arc, pub trace_processor: Arc, @@ -267,14 +270,17 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { + // Prepend \\.\pipe\ prefix to match datadog-agent behavior + let pipe_path = format!("{}{}", PIPE_NAME_PREFIX, pipe_name); + let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); loop { // Create a new pipe instance - let pipe = match ServerOptions::new().create(pipe_name) { + let pipe = match ServerOptions::new().create(&pipe_path) { Ok(pipe) => { - debug!("Created pipe server instance '{}' in byte mode", pipe_name); + debug!("Created pipe server instance '{}' in byte mode", pipe_path); pipe } Err(e) => { @@ -301,7 +307,7 @@ impl MiniAgent { return Err(e.into()); } Ok(()) => { - debug!("Client connected to '{}'", pipe_name); + debug!("Client connected to '{}'", pipe_path); pipe } }, @@ -458,10 +464,19 @@ impl MiniAgent { dd_apm_windows_pipe_name: Option<&str>, dd_dogstatsd_port: u16, ) -> http::Result { + // Prepend \\.\pipe\ prefix to pipe name if present, matching datadog-agent behavior + let receiver_socket = match dd_apm_windows_pipe_name { + #[cfg(windows)] + Some(pipe_name) => format!("{}{}", PIPE_NAME_PREFIX, pipe_name), + #[cfg(not(windows))] + Some(pipe_name) => pipe_name.to_string(), + None => String::new(), + }; + let config_json = serde_json::json!({ "receiver_port": dd_apm_receiver_port, "statsd_port": dd_dogstatsd_port, - "receiver_socket": serde_json::json!(dd_apm_windows_pipe_name.unwrap_or("")) + "receiver_socket": receiver_socket }); let response_json = json!( diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 4f2476c4..43d503ea 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -80,8 +80,8 @@ async fn test_mini_agent_tcp_handles_requests() { .await .expect("Failed to read /info response body") .to_bytes(); - let json: Value = serde_json::from_slice(&body) - .expect("Failed to parse /info response as JSON"); + let json: Value = + serde_json::from_slice(&body).expect("Failed to parse /info response as JSON"); // Check endpoints array assert_eq!( @@ -92,26 +92,22 @@ async fn test_mini_agent_tcp_handles_requests() { // Check client_drop_p0s flag assert_eq!( - json["client_drop_p0s"], - true, + json["client_drop_p0s"], true, "Expected client_drop_p0s to be true" ); // Check config object let config = &json["config"]; assert_eq!( - config["receiver_port"], - test_port, + config["receiver_port"], test_port, "Expected receiver_port to match test port" ); assert_eq!( - config["statsd_port"], - 8125, + config["statsd_port"], 8125, "Expected statsd_port to be 8125" ); assert_eq!( - config["receiver_socket"], - "", + config["receiver_socket"], "", "Expected empty receiver_socket for TCP" ); @@ -133,7 +129,9 @@ async fn test_mini_agent_tcp_handles_requests() { #[cfg(all(test, windows))] #[tokio::test] async fn test_mini_agent_named_pipe_handles_requests() { - let pipe_name = r"\\.\pipe\dd_trace_integration_test"; + // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior + let pipe_name = "dd_trace_integration_test"; + let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections let mut config = create_tcp_test_config(); config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); config.dd_apm_receiver_port = 0; @@ -157,7 +155,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { tokio::time::sleep(Duration::from_millis(100)).await; // Test /info endpoint - let info_response = send_named_pipe_request(pipe_name, "/info", "GET", None) + let info_response = send_named_pipe_request(&pipe_path, "/info", "GET", None) .await .expect("Failed to send /info request over named pipe"); assert_eq!( @@ -173,31 +171,28 @@ async fn test_mini_agent_named_pipe_handles_requests() { .await .expect("Failed to read /info response body") .to_bytes(); - let json: Value = serde_json::from_slice(&body) - .expect("Failed to parse /info response as JSON"); + let json: Value = + serde_json::from_slice(&body).expect("Failed to parse /info response as JSON"); // Check config object specific to named pipe let config_value = &json["config"]; assert_eq!( - config_value["receiver_port"], - 0, + config_value["receiver_port"], 0, "Expected receiver_port to be 0 for named pipe" ); assert_eq!( - config_value["statsd_port"], - 8125, + config_value["statsd_port"], 8125, "Expected statsd_port to be 8125" ); assert_eq!( - config_value["receiver_socket"], - pipe_name, - "Expected receiver_socket to match pipe name" + config_value["receiver_socket"], pipe_path, + "Expected receiver_socket to match full pipe path" ); // Test /v0.4/traces endpoint with real trace data let trace_payload = create_test_trace_payload(); let trace_response = - send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + send_named_pipe_request(&pipe_path, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request over named pipe"); assert_eq!( From 0bc46c14801eea05f392750cd36881bee5f1838b Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 27 Jan 2026 13:17:25 -0500 Subject: [PATCH 11/13] Update tests for rebased code with profiling --- .../tests/integration_test.rs | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 43d503ea..fc95cad2 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -6,7 +6,8 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ - config::Config, mini_agent::MiniAgent, trace_processor::ServerlessTraceProcessor, + config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, + trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; @@ -32,11 +33,15 @@ pub fn create_tcp_test_config() -> Config { .unwrap(), os: std::env::consts::OS.to_string(), tags: datadog_trace_agent::config::Tags::new(), - stats_flush_interval: 10, - trace_flush_interval: 5, + stats_flush_interval_secs: 10, + trace_flush_interval_secs: 5, trace_intake: libdd_common::Endpoint::default(), trace_stats_intake: libdd_common::Endpoint::default(), - verify_env_timeout: 1000, + profiling_intake: libdd_common::Endpoint::default(), + proxy_request_timeout_secs: 30, + proxy_request_max_retries: 3, + proxy_request_retry_backoff_base_ms: 100, + verify_env_timeout_ms: 1000, proxy_url: None, } } @@ -47,12 +52,13 @@ async fn test_mini_agent_tcp_handles_requests() { let config = Arc::new(create_tcp_test_config()); let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { - config, + config: config.clone(), trace_processor: Arc::new(ServerlessTraceProcessor {}), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config)), }; // Start the mini agent @@ -86,7 +92,12 @@ async fn test_mini_agent_tcp_handles_requests() { // Check endpoints array assert_eq!( json["endpoints"], - serde_json::json!(["/v0.4/traces", "/v0.6/stats", "/info"]), + serde_json::json!([ + "/v0.4/traces", + "/v0.6/stats", + "/info", + "/profiling/v1/input" + ]), "Expected endpoints array" ); @@ -138,12 +149,13 @@ async fn test_mini_agent_named_pipe_handles_requests() { let config = Arc::new(config); let mini_agent = MiniAgent { - config, + config: config.clone(), trace_processor: Arc::new(ServerlessTraceProcessor {}), trace_flusher: Arc::new(MockTraceFlusher), stats_processor: Arc::new(MockStatsProcessor), stats_flusher: Arc::new(MockStatsFlusher), env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config)), }; // Start the mini agent From 1fc9d372329f35b87143135935530781a5732994 Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 27 Jan 2026 14:05:17 -0500 Subject: [PATCH 12/13] Update path for named pipe in config so as to not double-format in the mini-agent --- crates/datadog-trace-agent/src/config.rs | 10 +++++++--- crates/datadog-trace-agent/src/mini_agent.rs | 17 ++++------------- .../tests/integration_test.rs | 2 +- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index f41be031..cf934859 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -112,7 +112,11 @@ impl Config { anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.") })?; - let dd_apm_windows_pipe_name: Option = env::var("DD_APM_WINDOWS_PIPE_NAME").ok(); + let dd_apm_windows_pipe_name: Option = + env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| { + // Prepend \\.\pipe\ prefix to match datadog-agent behavior + format!(r"\\.\pipe\{}", pipe_name) + }); let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() { 0 // Override to 0 when using Windows named pipe } else { @@ -359,13 +363,13 @@ mod tests { fn test_apm_windows_pipe_name() { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); - env::set_var("DD_APM_WINDOWS_PIPE_NAME", r"\\.\pipe\trace-agent"); + env::set_var("DD_APM_WINDOWS_PIPE_NAME", r"test_pipe"); let config_res = config::Config::new(); assert!(config_res.is_ok()); let config = config_res.unwrap(); assert_eq!( config.dd_apm_windows_pipe_name, - Some(r"\\.\pipe\trace-agent".to_string()) + Some(r"\\.\pipe\test_pipe".to_string()) ); // Port should be overridden to 0 when pipe is set assert_eq!(config.dd_apm_receiver_port, 0); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 644b87f9..7e7cef12 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -32,9 +32,6 @@ const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; -#[cfg(windows)] -const PIPE_NAME_PREFIX: &str = r"\\.\pipe\"; - pub struct MiniAgent { pub config: Arc, pub trace_processor: Arc, @@ -270,8 +267,8 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { - // Prepend \\.\pipe\ prefix to match datadog-agent behavior - let pipe_path = format!("{}{}", PIPE_NAME_PREFIX, pipe_name); + // pipe_name already includes \\.\pipe\ prefix from config + let pipe_path = pipe_name; let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); @@ -464,14 +461,8 @@ impl MiniAgent { dd_apm_windows_pipe_name: Option<&str>, dd_dogstatsd_port: u16, ) -> http::Result { - // Prepend \\.\pipe\ prefix to pipe name if present, matching datadog-agent behavior - let receiver_socket = match dd_apm_windows_pipe_name { - #[cfg(windows)] - Some(pipe_name) => format!("{}{}", PIPE_NAME_PREFIX, pipe_name), - #[cfg(not(windows))] - Some(pipe_name) => pipe_name.to_string(), - None => String::new(), - }; + // pipe_name already includes \\.\pipe\ prefix from config + let receiver_socket = dd_apm_windows_pipe_name.unwrap_or(""); let config_json = serde_json::json!({ "receiver_port": dd_apm_receiver_port, diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index fc95cad2..eacb64da 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -144,7 +144,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { let pipe_name = "dd_trace_integration_test"; let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections let mut config = create_tcp_test_config(); - config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); config.dd_apm_receiver_port = 0; let config = Arc::new(config); From 340b20503e26348c4d1708a52fcf2acddbdaf3a3 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 30 Jan 2026 17:02:58 -0500 Subject: [PATCH 13/13] Re-resolve merge conflict --- crates/datadog-trace-agent/src/config.rs | 6 ++++++ crates/datadog-trace-agent/src/trace_processor.rs | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index cf934859..574e373c 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -24,6 +24,12 @@ pub struct Tags { function_tags_string: OnceLock, } +impl Default for Tags { + fn default() -> Self { + Self::new() + } +} + impl Tags { pub fn from_env_string(env_tags: &str) -> Self { let mut tags = HashMap::new(); diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index fb30809e..87550fc4 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -104,7 +104,7 @@ impl TraceProcessor for ServerlessTraceProcessor { // double check content length is < max request content length in case transfer encoding is used if body_size > config.max_request_content_length { return log_and_create_http_response( - &format!("Error processing traces: Payload too large"), + "Error processing traces: Payload too large", StatusCode::PAYLOAD_TOO_LARGE, ); }