diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 92793b6d..574e373c 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -15,6 +15,7 @@ use libdd_trace_utils::config_utils::{ }; use libdd_trace_utils::trace_utils; +const DEFAULT_APM_RECEIVER_PORT: u16 = 8126; const DEFAULT_DOGSTATSD_PORT: u16 = 8125; #[derive(Debug)] @@ -79,6 +80,8 @@ impl Tags { #[derive(Debug)] pub struct Config { pub dd_site: String, + pub dd_apm_receiver_port: u16, + pub dd_apm_windows_pipe_name: Option, pub dd_dogstatsd_port: u16, pub env_type: trace_utils::EnvironmentType, pub app_name: Option, @@ -115,6 +118,20 @@ impl Config { anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.") })?; + let dd_apm_windows_pipe_name: Option = + env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| { + // Prepend \\.\pipe\ prefix to match datadog-agent behavior + format!(r"\\.\pipe\{}", pipe_name) + }); + let dd_apm_receiver_port: u16 = if dd_apm_windows_pipe_name.is_some() { + 0 // Override to 0 when using Windows named pipe + } else { + env::var("DD_APM_RECEIVER_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_APM_RECEIVER_PORT) + }; + let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT") .ok() .and_then(|port| port.parse::().ok()) @@ -165,6 +182,8 @@ impl Config { proxy_request_max_retries: 3, proxy_request_retry_backoff_base_ms: 100, verify_env_timeout_ms: 100, + dd_apm_receiver_port, + dd_apm_windows_pipe_name, dd_dogstatsd_port, dd_site, trace_intake: Endpoint { @@ -345,6 +364,55 @@ mod tests { env::remove_var("DD_DOGSTATSD_PORT"); } + #[test] + #[serial] + fn test_apm_windows_pipe_name() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_APM_WINDOWS_PIPE_NAME", r"test_pipe"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!( + config.dd_apm_windows_pipe_name, + Some(r"\\.\pipe\test_pipe".to_string()) + ); + // Port should be overridden to 0 when pipe is set + assert_eq!(config.dd_apm_receiver_port, 0); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_APM_WINDOWS_PIPE_NAME"); + } + + #[test] + #[serial] + fn test_default_apm_receiver_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_apm_receiver_port, 8126); + assert_eq!(config.dd_apm_windows_pipe_name, None); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + } + + #[test] + #[serial] + fn test_custom_apm_receiver_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_APM_RECEIVER_PORT", "18126"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_apm_receiver_port, 18126); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_APM_RECEIVER_PORT"); + } + fn test_config_with_dd_tags(dd_tags: &str) -> config::Config { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 280d6a7c..7e7cef12 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -15,12 +15,15 @@ use tracing::{debug, error}; use crate::http_utils::{log_and_create_http_response, verify_request_content_length}; use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; + +#[cfg(windows)] +use tokio::net::windows::named_pipe::ServerOptions; + use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils; use libdd_trace_utils::trace_utils::SendData; -const MINI_AGENT_PORT: usize = 8126; const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces"; const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; const INFO_ENDPOINT_PATH: &str = "/info"; @@ -68,7 +71,7 @@ impl MiniAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); - tokio::spawn(async move { + let trace_flusher_handle = tokio::spawn(async move { trace_flusher.start_trace_flusher(trace_rx).await; }); @@ -81,7 +84,7 @@ impl MiniAgent { // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); - tokio::spawn(async move { + let stats_flusher_handle = tokio::spawn(async move { stats_flusher .start_stats_flusher(stats_config, stats_rx) .await; @@ -125,16 +128,77 @@ impl MiniAgent { ) }); - let addr = SocketAddr::from(([127, 0, 0, 1], MINI_AGENT_PORT as u16)); - let listener = tokio::net::TcpListener::bind(&addr).await?; - - debug!("Mini Agent started: listening on port {MINI_AGENT_PORT}"); + // Determine which transport to use based on configuration + if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + debug!("Mini Agent started: listening on named pipe {}", pipe_name); + } else { + debug!( + "Mini Agent started: listening on port {}", + self.config.dd_apm_receiver_port + ); + } debug!( - "Time taken start the Mini Agent: {} ms", + "Time taken to start the Mini Agent: {} ms", now.elapsed().as_millis() ); + + if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + // Windows named pipe transport + #[cfg(windows)] + { + Self::serve_named_pipe( + pipe_name, + service, + trace_flusher_handle, + stats_flusher_handle, + ) + .await?; + } + + #[cfg(not(windows))] + { + error!( + "Named pipes are only supported on Windows, cannot use pipe: {}", + pipe_name + ); + return Err("Named pipes are only supported on Windows".into()); + } + } else { + // TCP transport + let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); + let listener = tokio::net::TcpListener::bind(&addr).await?; + + Self::serve_tcp( + listener, + service, + trace_flusher_handle, + stats_flusher_handle, + ) + .await?; + } + + Ok(()) + } + + async fn serve_tcp( + listener: tokio::net::TcpListener, + service: S, + mut trace_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_flusher_handle: tokio::task::JoinHandle<()>, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); + loop { let conn = tokio::select! { con_res = listener.accept() => match con_res { @@ -165,6 +229,15 @@ impl MiniAgent { }, Ok(()) | Err(_) => continue, }, + // If there's some error in the background tasks, we can't send data + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, }; let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); @@ -177,6 +250,98 @@ impl MiniAgent { } } + #[cfg(windows)] + async fn serve_named_pipe( + pipe_name: &str, + service: S, + mut trace_flusher_handle: tokio::task::JoinHandle<()>, + mut stats_flusher_handle: tokio::task::JoinHandle<()>, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { + // pipe_name already includes \\.\pipe\ prefix from config + let pipe_path = pipe_name; + + let server = hyper::server::conn::http1::Builder::new(); + let mut joinset = tokio::task::JoinSet::new(); + + loop { + // Create a new pipe instance + let pipe = match ServerOptions::new().create(&pipe_path) { + Ok(pipe) => { + debug!("Created pipe server instance '{}' in byte mode", pipe_path); + pipe + } + Err(e) => { + error!("Failed to create named pipe: {e}"); + return Err(e.into()); + } + }; + + // Wait for client connection + let conn = tokio::select! { + connect_res = pipe.connect() => match connect_res { + Err(e) + if matches!( + e.kind(), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionRefused + ) => + { + continue; + } + Err(e) => { + error!("Named pipe connection error: {e}"); + return Err(e.into()); + } + Ok(()) => { + debug!("Client connected to '{}'", pipe_path); + pipe + } + }, + finished = async { + match joinset.join_next().await { + Some(finished) => finished, + None => std::future::pending().await, + } + } => match finished { + Err(e) if e.is_panic() => { + std::panic::resume_unwind(e.into_panic()); + }, + Ok(()) | Err(_) => continue, + }, + // If there's some error in the background tasks, we can't send data + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, + }; + + // Hyper http parser handles buffering pipe data + let conn = hyper_util::rt::TokioIo::new(conn); + let server = server.clone(); + let service = service.clone(); + joinset.spawn(async move { + if let Err(e) = server.serve_connection(conn, service).await { + error!("Connection error: {e}"); + } + }); + } + } + #[allow(clippy::too_many_arguments)] async fn trace_endpoint_handler( config: Arc, @@ -219,7 +384,11 @@ impl MiniAgent { ), } } - (_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) { + (_, INFO_ENDPOINT_PATH) => match Self::info_handler( + config.dd_apm_receiver_port, + config.dd_apm_windows_pipe_name.as_deref(), + config.dd_dogstatsd_port, + ) { Ok(res) => Ok(res), Err(err) => log_and_create_http_response( &format!("Info endpoint error: {err}"), @@ -287,7 +456,20 @@ impl MiniAgent { } } - fn info_handler(dd_dogstatsd_port: u16) -> http::Result { + fn info_handler( + dd_apm_receiver_port: u16, + dd_apm_windows_pipe_name: Option<&str>, + dd_dogstatsd_port: u16, + ) -> http::Result { + // pipe_name already includes \\.\pipe\ prefix from config + let receiver_socket = dd_apm_windows_pipe_name.unwrap_or(""); + + let config_json = serde_json::json!({ + "receiver_port": dd_apm_receiver_port, + "statsd_port": dd_dogstatsd_port, + "receiver_socket": receiver_socket + }); + let response_json = json!( { "endpoints": [ @@ -297,9 +479,7 @@ impl MiniAgent { PROFILING_ENDPOINT_PATH ], "client_drop_p0s": true, - "config": { - "statsd_port": dd_dogstatsd_port - } + "config": config_json } ); Response::builder() diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 0e75b06f..87550fc4 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -203,6 +203,8 @@ mod tests { ..Default::default() }, dd_site: "datadoghq.com".to_string(), + dd_apm_receiver_port: 8126, + dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, env_type: trace_utils::EnvironmentType::CloudFunction, os: "linux".to_string(), diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs new file mode 100644 index 00000000..a41cf055 --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -0,0 +1,94 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Helper functions for integration tests + +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use libdd_trace_utils::test_utils::create_test_json_span; +use std::time::{Duration, UNIX_EPOCH}; +use tokio::time::timeout; + +/// Create a simple test trace payload as msgpack bytes +pub fn create_test_trace_payload() -> Vec { + let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64; + let json_span = create_test_json_span(11, 222, 0, start, false); + rmp_serde::to_vec(&vec![vec![json_span]]).expect("Failed to serialize test trace") +} + +/// Send an HTTP request over TCP and return the response +pub async fn send_tcp_request( + port: u16, + uri: &str, + method: &str, + body: Option>, +) -> Result, Box> { + let stream = timeout( + Duration::from_secs(2), + tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port)), + ) + .await??; + + let io = TokioIo::new(stream); + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + tokio::spawn(async move { + let _ = conn.await; + }); + + let mut request_builder = Request::builder() + .uri(uri) + .method(method) + .header("Content-Type", "application/msgpack"); + + let response = if let Some(body_data) = body { + let body_len = body_data.len(); + request_builder = request_builder.header("Content-Length", body_len.to_string()); + let request = request_builder.body(hyper_migration::Body::from(body_data))?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + } else { + let request = request_builder.body(hyper_migration::Body::empty())?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + }; + + Ok(response) +} + +#[cfg(windows)] +/// Send an HTTP request over named pipe and return the response +pub async fn send_named_pipe_request( + pipe_name: &str, + uri: &str, + method: &str, + body: Option>, +) -> Result, Box> { + use tokio::net::windows::named_pipe::ClientOptions; + + // Note: open() is synchronous on Windows, not async + let client = ClientOptions::new().open(pipe_name)?; + + let io = TokioIo::new(client); + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + tokio::spawn(async move { + let _ = conn.await; + }); + + let mut request_builder = Request::builder() + .uri(uri) + .method(method) + .header("Content-Type", "application/msgpack"); + + let response = if let Some(body_data) = body { + let body_len = body_data.len(); + request_builder = request_builder.header("Content-Length", body_len.to_string()); + let request = request_builder.body(hyper_migration::Body::from(body_data))?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + } else { + let request = request_builder.body(hyper_migration::Body::empty())?; + timeout(Duration::from_secs(2), sender.send_request(request)).await?? + }; + + Ok(response) +} diff --git a/crates/datadog-trace-agent/tests/common/mocks.rs b/crates/datadog-trace-agent/tests/common/mocks.rs new file mode 100644 index 00000000..4e59e646 --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mocks.rs @@ -0,0 +1,114 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Mock implementations of trace agent components for testing + +use datadog_trace_agent::{ + config::Config, env_verifier::EnvVerifier, stats_flusher::StatsFlusher, + stats_processor::StatsProcessor, trace_flusher::TraceFlusher, trace_processor::TraceProcessor, +}; +use libdd_common::hyper_migration; +use libdd_trace_protobuf::pb; +use libdd_trace_utils::trace_utils::{self, MiniAgentMetadata, SendData}; +use std::sync::Arc; +use tokio::sync::mpsc::{Receiver, Sender}; + +/// Mock trace processor that returns 200 OK for all requests +#[allow(dead_code)] +pub struct MockTraceProcessor; + +#[async_trait::async_trait] +impl TraceProcessor for MockTraceProcessor { + async fn process_traces( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _trace_tx: Sender, + _mini_agent_metadata: Arc, + ) -> Result { + hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("{}")) + } +} + +/// Mock trace flusher that consumes messages without processing +pub struct MockTraceFlusher; + +#[async_trait::async_trait] +impl TraceFlusher for MockTraceFlusher { + fn new( + _aggregator: Arc>, + _config: Arc, + ) -> Self { + MockTraceFlusher + } + + async fn start_trace_flusher(&self, mut trace_rx: Receiver) { + // Consume messages from the channel without processing them + while let Some(_trace) = trace_rx.recv().await { + // Just discard the trace - we're not testing the flusher + } + } + + async fn send(&self, _traces: Vec) -> Option> { + None + } + + async fn flush(&self, _failed_traces: Option>) -> Option> { + None + } +} + +/// Mock stats processor that returns 200 OK for all requests +pub struct MockStatsProcessor; + +#[async_trait::async_trait] +impl StatsProcessor for MockStatsProcessor { + async fn process_stats( + &self, + _config: Arc, + _req: hyper_migration::HttpRequest, + _stats_tx: Sender, + ) -> Result { + hyper::Response::builder() + .status(200) + .body(hyper_migration::Body::from("{}")) + } +} + +/// Mock stats flusher that consumes messages without processing +pub struct MockStatsFlusher; + +#[async_trait::async_trait] +impl StatsFlusher for MockStatsFlusher { + async fn start_stats_flusher( + &self, + _config: Arc, + mut stats_rx: Receiver, + ) { + // Consume messages from the channel without processing them + while let Some(_stats) = stats_rx.recv().await { + // Just discard the stats - we're not testing the flusher + } + } + + async fn flush_stats(&self, _config: Arc, _traces: Vec) { + // Do nothing + } +} + +/// Mock environment verifier that returns default metadata +pub struct MockEnvVerifier; + +#[async_trait::async_trait] +impl EnvVerifier for MockEnvVerifier { + async fn verify_environment( + &self, + _timeout_ms: u64, + _env_type: &trace_utils::EnvironmentType, + _os: &str, + ) -> MiniAgentMetadata { + MiniAgentMetadata::default() + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs new file mode 100644 index 00000000..3d83bbdb --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -0,0 +1,7 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Common test utilities, mocks, and helpers for integration tests + +pub mod helpers; +pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs new file mode 100644 index 00000000..eacb64da --- /dev/null +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -0,0 +1,218 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; +use datadog_trace_agent::{ + config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, + trace_processor::ServerlessTraceProcessor, +}; +use http_body_util::BodyExt; +use hyper::StatusCode; +use libdd_trace_utils::trace_utils; +use serde_json::Value; +use std::sync::Arc; +use std::time::Duration; + +#[cfg(windows)] +use common::helpers::send_named_pipe_request; + +/// Create a test config with TCP transport +pub fn create_tcp_test_config() -> Config { + Config { + dd_site: "mock-datadoghq.com".to_string(), + dd_apm_receiver_port: 8126, + dd_apm_windows_pipe_name: None, + dd_dogstatsd_port: 8125, + env_type: trace_utils::EnvironmentType::AzureFunction, + app_name: Some("test-app".to_string()), + max_request_content_length: 10_000_000, + obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() + .unwrap(), + os: std::env::consts::OS.to_string(), + tags: datadog_trace_agent::config::Tags::new(), + stats_flush_interval_secs: 10, + trace_flush_interval_secs: 5, + trace_intake: libdd_common::Endpoint::default(), + trace_stats_intake: libdd_common::Endpoint::default(), + profiling_intake: libdd_common::Endpoint::default(), + proxy_request_timeout_secs: 30, + proxy_request_max_retries: 3, + proxy_request_retry_backoff_base_ms: 100, + verify_env_timeout_ms: 1000, + proxy_url: None, + } +} + +#[cfg(test)] +#[tokio::test] +async fn test_mini_agent_tcp_handles_requests() { + let config = Arc::new(create_tcp_test_config()); + let test_port = config.dd_apm_receiver_port; + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config)), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Give server time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Test /info endpoint + let info_response = send_tcp_request(test_port, "/info", "GET", None) + .await + .expect("Failed to send /info request"); + assert_eq!( + info_response.status(), + StatusCode::OK, + "Expected 200 OK from /info endpoint" + ); + + // Verify /info endpoint response content + let body = info_response + .into_body() + .collect() + .await + .expect("Failed to read /info response body") + .to_bytes(); + let json: Value = + serde_json::from_slice(&body).expect("Failed to parse /info response as JSON"); + + // Check endpoints array + assert_eq!( + json["endpoints"], + serde_json::json!([ + "/v0.4/traces", + "/v0.6/stats", + "/info", + "/profiling/v1/input" + ]), + "Expected endpoints array" + ); + + // Check client_drop_p0s flag + assert_eq!( + json["client_drop_p0s"], true, + "Expected client_drop_p0s to be true" + ); + + // Check config object + let config = &json["config"]; + assert_eq!( + config["receiver_port"], test_port, + "Expected receiver_port to match test port" + ); + assert_eq!( + config["statsd_port"], 8125, + "Expected statsd_port to be 8125" + ); + assert_eq!( + config["receiver_socket"], "", + "Expected empty receiver_socket for TCP" + ); + + // Test /v0.4/traces endpoint with real trace data + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint" + ); + + // Clean up + agent_handle.abort(); +} + +#[cfg(all(test, windows))] +#[tokio::test] +async fn test_mini_agent_named_pipe_handles_requests() { + // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior + let pipe_name = "dd_trace_integration_test"; + let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections + let mut config = create_tcp_test_config(); + config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); + config.dd_apm_receiver_port = 0; + let config = Arc::new(config); + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config)), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Give server time to create pipe + tokio::time::sleep(Duration::from_millis(100)).await; + + // Test /info endpoint + let info_response = send_named_pipe_request(&pipe_path, "/info", "GET", None) + .await + .expect("Failed to send /info request over named pipe"); + assert_eq!( + info_response.status(), + StatusCode::OK, + "Expected 200 OK from /info endpoint over named pipe" + ); + + // Verify /info endpoint response content + let body = info_response + .into_body() + .collect() + .await + .expect("Failed to read /info response body") + .to_bytes(); + let json: Value = + serde_json::from_slice(&body).expect("Failed to parse /info response as JSON"); + + // Check config object specific to named pipe + let config_value = &json["config"]; + assert_eq!( + config_value["receiver_port"], 0, + "Expected receiver_port to be 0 for named pipe" + ); + assert_eq!( + config_value["statsd_port"], 8125, + "Expected statsd_port to be 8125" + ); + assert_eq!( + config_value["receiver_socket"], pipe_path, + "Expected receiver_socket to match full pipe path" + ); + + // Test /v0.4/traces endpoint with real trace data + let trace_payload = create_test_trace_payload(); + let trace_response = + send_named_pipe_request(&pipe_path, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint over named pipe" + ); + + // Clean up + agent_handle.abort(); +}