From 070093dfe5750d5ff59530b08b4b4cdabbd1dae9 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:37:48 -0500 Subject: [PATCH 1/8] Add an integration test that covers all of datadog-trace-agent --- .../tests/common/mock_server.rs | 128 ++++++++++++++++++ .../datadog-trace-agent/tests/common/mod.rs | 1 + .../tests/integration_test.rs | 126 ++++++++++++++++- 3 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 crates/datadog-trace-agent/tests/common/mock_server.rs diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs new file mode 100644 index 00000000..c1b00bdd --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -0,0 +1,128 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Simple mock HTTP server for testing flushers + +use http_body_util::BodyExt; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; + +#[derive(Clone, Debug)] +pub struct ReceivedRequest { + pub method: String, + pub path: String, + pub headers: Vec<(String, String)>, + pub body: Vec, +} + +#[derive(Clone)] +pub struct MockServer { + pub addr: SocketAddr, + pub received_requests: Arc>>, +} + +impl MockServer { + /// Start a mock HTTP server on a random port + pub async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind mock server"); + let addr = listener.local_addr().expect("Failed to get local addr"); + + let received_requests = Arc::new(Mutex::new(Vec::new())); + let requests_clone = received_requests.clone(); + + tokio::spawn(async move { + loop { + let (stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(_) => break, + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } + }); + + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + }); + + MockServer { + addr, + received_requests, + } + } + + /// Get the base URL of the mock server + pub fn url(&self) -> String { + format!("http://{}", self.addr) + } + + /// Get all received requests + #[allow(dead_code)] + pub fn get_requests(&self) -> Vec { + self.received_requests.lock().unwrap().clone() + } + + /// Get requests matching a path + pub fn get_requests_for_path(&self, path: &str) -> Vec { + self.received_requests + .lock() + .unwrap() + .iter() + .filter(|req| req.path == path) + .cloned() + .collect() + } + + /// Clear all received requests + #[allow(dead_code)] + pub fn clear_requests(&self) { + self.received_requests.lock().unwrap().clear(); + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs index 3d83bbdb..447946a7 100644 --- a/crates/datadog-trace-agent/tests/common/mod.rs +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -4,4 +4,5 @@ //! Common test utilities, mocks, and helpers for integration tests pub mod helpers; +pub mod mock_server; pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index ea2fde9e..00e9d71f 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -7,12 +7,13 @@ use common::helpers::{create_test_trace_payload, send_tcp_request}; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, - trace_processor::ServerlessTraceProcessor, + trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; use libdd_trace_utils::trace_utils; use serde_json::Value; +use serial_test::serial; use std::sync::Arc; use std::time::Duration; @@ -51,6 +52,7 @@ pub fn create_tcp_test_config() -> Config { #[cfg(test)] #[tokio::test] +#[serial] async fn test_mini_agent_tcp_handles_requests() { let config = Arc::new(create_tcp_test_config()); let test_port = config.dd_apm_receiver_port; @@ -219,3 +221,125 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Clean up agent_handle.abort(); } + +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_with_real_flushers() { + use common::mock_server::MockServer; + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + // Start mock HTTP server to intercept trace/stats requests + let mock_server = MockServer::start().await; + + // Give mock server a moment to be ready + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create config pointing to mock server + let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); + let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); + + let mut config = create_tcp_test_config(); + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + // Set short flush intervals for faster testing + config.trace_flush_interval = 1; // 1 second + config.stats_flush_interval = 1; // 1 second + + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + // Create mini agent with REAL flushers + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Give server time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Send trace data through the mini agent + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint" + ); + + // Wait for the trace flusher to flush (interval is 1 second + buffer) + tokio::time::sleep(Duration::from_millis(1500)).await; + + // Verify the mock server received the trace request + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + // Validate the trace request + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + // Check headers + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + // The real flusher uses application/x-protobuf after coalescing traces + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + // The body should be non-empty protobuf data + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); + + println!("✓ Trace flusher successfully sent data to mock server"); + println!(" - Received {} trace request(s)", trace_reqs.len()); + println!(" - Payload size: {} bytes", trace_req.body.len()); + println!(" - Headers: {} present", trace_req.headers.len()); + + // Clean up + agent_handle.abort(); +} From f024d26f9a88afd2be5846071c4f57b2ccd2c5ff Mon Sep 17 00:00:00 2001 From: Lewis Date: Thu, 12 Feb 2026 10:19:12 -0500 Subject: [PATCH 2/8] Update integration test windows pipe names --- .../tests/integration_test.rs | 176 ++++++++++++++++-- 1 file changed, 165 insertions(+), 11 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 00e9d71f..a4e17c80 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -21,14 +21,14 @@ use std::time::Duration; use common::helpers::send_named_pipe_request; /// Create a test config with TCP transport -pub fn create_tcp_test_config() -> Config { +pub fn create_tcp_test_config(port: u16) -> Config { Config { dd_site: "mock-datadoghq.com".to_string(), dd_apm_receiver_port: 8126, - #[cfg(all(windows, feature = "windows-pipes"))] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, - #[cfg(all(windows, feature = "windows-pipes"))] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::AzureFunction, app_name: Some("test-app".to_string()), @@ -54,7 +54,7 @@ pub fn create_tcp_test_config() -> Config { #[tokio::test] #[serial] async fn test_mini_agent_tcp_handles_requests() { - let config = Arc::new(create_tcp_test_config()); + let config = Arc::new(create_tcp_test_config(8126)); let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { config: config.clone(), @@ -148,9 +148,8 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior let pipe_name = "dd_trace_integration_test"; let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(0); config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); - config.dd_apm_receiver_port = 0; let config = Arc::new(config); let mini_agent = MiniAgent { @@ -242,7 +241,7 @@ async fn test_mini_agent_with_real_flushers() { let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(8127); config.trace_intake = libdd_common::Endpoint { url: trace_url.parse().unwrap(), api_key: Some("test-api-key".into()), @@ -254,8 +253,8 @@ async fn test_mini_agent_with_real_flushers() { ..Default::default() }; // Set short flush intervals for faster testing - config.trace_flush_interval = 1; // 1 second - config.stats_flush_interval = 1; // 1 second + config.trace_flush_interval_secs = 1; // 1 second + config.stats_flush_interval_secs = 1; // 1 second let config = Arc::new(config); let test_port = config.dd_apm_receiver_port; @@ -272,6 +271,7 @@ async fn test_mini_agent_with_real_flushers() { stats_processor: Arc::new(ServerlessStatsProcessor {}), stats_flusher: Arc::new(ServerlessStatsFlusher {}), env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), }; // Start the mini agent @@ -279,8 +279,21 @@ async fn test_mini_agent_with_real_flushers() { let _ = mini_agent.start_mini_agent().await; }); - // Give server time to start - tokio::time::sleep(Duration::from_millis(100)).await; + // Wait for server to be ready by polling /info endpoint + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); // Send trace data through the mini agent let trace_payload = create_test_trace_payload(); @@ -343,3 +356,144 @@ async fn test_mini_agent_with_real_flushers() { // Clean up agent_handle.abort(); } + +#[cfg(all(test, windows))] +#[tokio::test] +#[serial] +async fn test_mini_agent_named_pipe_with_real_flushers() { + use common::mock_server::MockServer; + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + // Start mock HTTP server to intercept trace/stats requests + let mock_server = MockServer::start().await; + + // Give mock server a moment to be ready + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create config pointing to mock server + let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); + let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); + + let mut config = create_tcp_test_config(8127); + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + // Set short flush intervals for faster testing + config.trace_flush_interval_secs = 1; // 1 second + config.stats_flush_interval_secs = 1; // 1 second + + // Configure for named pipe + let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_receiver_port = 0; + + let config = Arc::new(config); + + // Create mini agent with REAL flushers + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready by polling /info endpoint + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent named pipe server failed to start within timeout" + ); + + // Send trace data through the mini agent via named pipe + let trace_payload = create_test_trace_payload(); + let trace_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint over named pipe" + ); + + // Wait for the trace flusher to flush (interval is 1 second + buffer) + tokio::time::sleep(Duration::from_millis(1500)).await; + + // Verify the mock server received the trace request + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + // Validate the trace request + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + // Check headers + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + // The real flusher uses application/x-protobuf after coalescing traces + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + // The body should be non-empty protobuf data + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); + + println!("✓ [Named Pipe] Trace flusher successfully sent data to mock server"); + println!(" - Received {} trace request(s)", trace_reqs.len()); + println!(" - Payload size: {} bytes", trace_req.body.len()); + println!(" - Headers: {} present", trace_req.headers.len()); + + // Clean up + agent_handle.abort(); +} From 97598ae432484ceef4366d8ee06524c212002b1d Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:41:04 -0500 Subject: [PATCH 3/8] Add windows full integration test for datadog-trace-agent --- crates/datadog-trace-agent/src/config.rs | 35 ++++++++++++++++++ .../tests/integration_test.rs | 36 ++----------------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 754f75eb..87ce1c68 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -560,3 +560,38 @@ mod tests { assert_eq!(config.tags.function_tags(), None); } } + +/// Test helpers for creating Config instances in tests +#[cfg(any(test, debug_assertions))] +pub mod test_helpers { + use super::*; + + /// Create a test config with TCP transport + pub fn create_tcp_test_config(port: u16) -> Config { + Config { + dd_site: "mock-datadoghq.com".to_string(), + dd_apm_receiver_port: port, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + dd_apm_windows_pipe_name: None, + dd_dogstatsd_port: 8125, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + dd_dogstatsd_windows_pipe_name: None, + env_type: trace_utils::EnvironmentType::AzureFunction, + app_name: Some("test-app".to_string()), + max_request_content_length: 10_000_000, + obfuscation_config: obfuscation_config::ObfuscationConfig::new().unwrap(), + os: std::env::consts::OS.to_string(), + tags: Tags::new(), + stats_flush_interval_secs: 10, + trace_flush_interval_secs: 5, + trace_intake: Endpoint::default(), + trace_stats_intake: Endpoint::default(), + profiling_intake: Endpoint::default(), + proxy_request_timeout_secs: 30, + proxy_request_max_retries: 3, + proxy_request_retry_backoff_base_ms: 100, + verify_env_timeout_ms: 1000, + proxy_url: None, + } + } +} diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index a4e17c80..f750229a 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -6,12 +6,12 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ - config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, - trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, + config::test_helpers::create_tcp_test_config, mini_agent::MiniAgent, + proxy_flusher::ProxyFlusher, trace_flusher::TraceFlusher, + trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; -use libdd_trace_utils::trace_utils; use serde_json::Value; use serial_test::serial; use std::sync::Arc; @@ -20,36 +20,6 @@ use std::time::Duration; #[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; -/// Create a test config with TCP transport -pub fn create_tcp_test_config(port: u16) -> Config { - Config { - dd_site: "mock-datadoghq.com".to_string(), - dd_apm_receiver_port: 8126, - #[cfg(any(all(windows, feature = "windows-pipes"), test))] - dd_apm_windows_pipe_name: None, - dd_dogstatsd_port: 8125, - #[cfg(any(all(windows, feature = "windows-pipes"), test))] - dd_dogstatsd_windows_pipe_name: None, - env_type: trace_utils::EnvironmentType::AzureFunction, - app_name: Some("test-app".to_string()), - max_request_content_length: 10_000_000, - obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() - .unwrap(), - os: std::env::consts::OS.to_string(), - tags: datadog_trace_agent::config::Tags::new(), - stats_flush_interval_secs: 10, - trace_flush_interval_secs: 5, - trace_intake: libdd_common::Endpoint::default(), - trace_stats_intake: libdd_common::Endpoint::default(), - profiling_intake: libdd_common::Endpoint::default(), - proxy_request_timeout_secs: 30, - proxy_request_max_retries: 3, - proxy_request_retry_backoff_base_ms: 100, - verify_env_timeout_ms: 1000, - proxy_url: None, - } -} - #[cfg(test)] #[tokio::test] #[serial] From d8d7775883ffd127db8f12144a40a4a16009586e Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 30 Jan 2026 17:37:18 -0500 Subject: [PATCH 4/8] Extract test helpers --- .../tests/integration_test.rs | 258 +++++++++--------- 1 file changed, 133 insertions(+), 125 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index f750229a..1d1b2d2c 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -20,6 +20,112 @@ use std::time::Duration; #[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; +/// Helper to configure a config with mock server endpoints +pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { + let trace_url = format!("{}/api/v0.2/traces", mock_server_url); + let stats_url = format!("{}/api/v0.6/stats", mock_server_url); + + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_flush_interval_secs = 1; + config.stats_flush_interval_secs = 1; +} + +/// Helper to create a mini agent with real flushers +pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), + } +} + +/// Helper to verify trace request sent to mock server +pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { + + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); +} + +/// Create a test config with TCP transport +pub fn create_tcp_test_config(port: u16) -> Config { + Config { + dd_site: "mock-datadoghq.com".to_string(), + dd_apm_receiver_port: port, + dd_apm_windows_pipe_name: None, + dd_dogstatsd_port: 8125, + dd_dogstatsd_windows_pipe_name: None, + env_type: trace_utils::EnvironmentType::AzureFunction, + app_name: Some("test-app".to_string()), + max_request_content_length: 10_000_000, + obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() + .unwrap(), + os: std::env::consts::OS.to_string(), + tags: datadog_trace_agent::config::Tags::new(), + stats_flush_interval_secs: 10, + trace_flush_interval_secs: 5, + trace_intake: libdd_common::Endpoint::default(), + trace_stats_intake: libdd_common::Endpoint::default(), + profiling_intake: libdd_common::Endpoint::default(), + proxy_request_timeout_secs: 30, + proxy_request_max_retries: 3, + proxy_request_retry_backoff_base_ms: 100, + verify_env_timeout_ms: 1000, + proxy_url: None, + } +} + #[cfg(test)] #[tokio::test] #[serial] @@ -196,22 +302,12 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[serial] async fn test_mini_agent_with_real_flushers() { use common::mock_server::MockServer; - use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, - stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, - }; - // Start mock HTTP server to intercept trace/stats requests let mock_server = MockServer::start().await; - - // Give mock server a moment to be ready tokio::time::sleep(Duration::from_millis(50)).await; - // Create config pointing to mock server - let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); - let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - let mut config = create_tcp_test_config(8127); +<<<<<<< HEAD config.trace_intake = libdd_common::Endpoint { url: trace_url.parse().unwrap(), api_key: Some("test-api-key".into()), @@ -243,13 +339,19 @@ async fn test_mini_agent_with_real_flushers() { env_verifier: Arc::new(MockEnvVerifier), proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), }; +======= + configure_mock_endpoints(&mut config, &mock_server.url()); + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + let mini_agent = create_mini_agent_with_real_flushers(config); +>>>>>>> e680a1b (Extract test helpers) - // Start the mini agent let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; }); - // Wait for server to be ready by polling /info endpoint + // Wait for server to be ready let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; @@ -265,65 +367,18 @@ async fn test_mini_agent_with_real_flushers() { "Mini agent server failed to start within timeout" ); - // Send trace data through the mini agent + // Send trace data let trace_payload = create_test_trace_payload(); let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request"); + assert_eq!(trace_response.status(), StatusCode::OK); - assert_eq!( - trace_response.status(), - StatusCode::OK, - "Expected 200 OK from /v0.4/traces endpoint" - ); - - // Wait for the trace flusher to flush (interval is 1 second + buffer) + // Wait for flush tokio::time::sleep(Duration::from_millis(1500)).await; - // Verify the mock server received the trace request - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); - - assert!( - !trace_reqs.is_empty(), - "Expected at least one trace request to mock server" - ); - - // Validate the trace request - let trace_req = &trace_reqs[0]; - assert_eq!(trace_req.method, "POST", "Expected POST method"); + verify_trace_request(&mock_server); - // Check headers - let content_type = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "content-type") - .map(|(_, v)| v.as_str()); - // The real flusher uses application/x-protobuf after coalescing traces - assert_eq!( - content_type, - Some("application/x-protobuf"), - "Expected protobuf content-type" - ); - - let api_key = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "dd-api-key") - .map(|(_, v)| v.as_str()); - assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); - - // The body should be non-empty protobuf data - assert!( - !trace_req.body.is_empty(), - "Expected non-empty trace payload" - ); - - println!("✓ Trace flusher successfully sent data to mock server"); - println!(" - Received {} trace request(s)", trace_reqs.len()); - println!(" - Payload size: {} bytes", trace_req.body.len()); - println!(" - Headers: {} present", trace_req.headers.len()); - - // Clean up agent_handle.abort(); } @@ -332,17 +387,11 @@ async fn test_mini_agent_with_real_flushers() { #[serial] async fn test_mini_agent_named_pipe_with_real_flushers() { use common::mock_server::MockServer; - use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, - stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, - }; - // Start mock HTTP server to intercept trace/stats requests let mock_server = MockServer::start().await; - - // Give mock server a moment to be ready tokio::time::sleep(Duration::from_millis(50)).await; +<<<<<<< HEAD // Create config pointing to mock server let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); @@ -363,12 +412,16 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { config.stats_flush_interval_secs = 1; // 1 second // Configure for named pipe +======= +>>>>>>> e680a1b (Extract test helpers) let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + let mut config = create_tcp_test_config(0); + configure_mock_endpoints(&mut config, &mock_server.url()); config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); config.dd_apm_receiver_port = 0; - let config = Arc::new(config); +<<<<<<< HEAD // Create mini agent with REAL flushers let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); let mini_agent = MiniAgent { @@ -383,13 +436,15 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { env_verifier: Arc::new(MockEnvVerifier), proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), }; +======= + let mini_agent = create_mini_agent_with_real_flushers(config); +>>>>>>> e680a1b (Extract test helpers) - // Start the mini agent let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; }); - // Wait for server to be ready by polling /info endpoint + // Wait for server to be ready let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; @@ -405,65 +460,18 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { "Mini agent named pipe server failed to start within timeout" ); - // Send trace data through the mini agent via named pipe + // Send trace data via named pipe let trace_payload = create_test_trace_payload(); let trace_response = send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!(trace_response.status(), StatusCode::OK); - assert_eq!( - trace_response.status(), - StatusCode::OK, - "Expected 200 OK from /v0.4/traces endpoint over named pipe" - ); - - // Wait for the trace flusher to flush (interval is 1 second + buffer) + // Wait for flush tokio::time::sleep(Duration::from_millis(1500)).await; - // Verify the mock server received the trace request - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); - - assert!( - !trace_reqs.is_empty(), - "Expected at least one trace request to mock server" - ); - - // Validate the trace request - let trace_req = &trace_reqs[0]; - assert_eq!(trace_req.method, "POST", "Expected POST method"); - - // Check headers - let content_type = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "content-type") - .map(|(_, v)| v.as_str()); - // The real flusher uses application/x-protobuf after coalescing traces - assert_eq!( - content_type, - Some("application/x-protobuf"), - "Expected protobuf content-type" - ); - - let api_key = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "dd-api-key") - .map(|(_, v)| v.as_str()); - assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); - - // The body should be non-empty protobuf data - assert!( - !trace_req.body.is_empty(), - "Expected non-empty trace payload" - ); + verify_trace_request(&mock_server); - println!("✓ [Named Pipe] Trace flusher successfully sent data to mock server"); - println!(" - Received {} trace request(s)", trace_reqs.len()); - println!(" - Payload size: {} bytes", trace_req.body.len()); - println!(" - Headers: {} present", trace_req.headers.len()); - - // Clean up agent_handle.abort(); } From dfea0d4da695d91cb7d71bd3cd83c9339dd71ed4 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 30 Jan 2026 17:45:01 -0500 Subject: [PATCH 5/8] Improve test quality --- .../tests/common/mock_server.rs | 121 ++++++++++-------- .../tests/integration_test.rs | 14 +- 2 files changed, 77 insertions(+), 58 deletions(-) diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs index c1b00bdd..5e0b485d 100644 --- a/crates/datadog-trace-agent/tests/common/mock_server.rs +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -19,10 +19,10 @@ pub struct ReceivedRequest { pub body: Vec, } -#[derive(Clone)] pub struct MockServer { pub addr: SocketAddr, pub received_requests: Arc>>, + shutdown_tx: Option>, } impl MockServer { @@ -36,65 +36,78 @@ impl MockServer { let received_requests = Arc::new(Mutex::new(Vec::new())); let requests_clone = received_requests.clone(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { loop { - let (stream, _) = match listener.accept().await { - Ok(conn) => conn, - Err(_) => break, - }; - - let io = TokioIo::new(stream); - let requests = requests_clone.clone(); - - tokio::spawn(async move { - let service = hyper::service::service_fn(move |req: Request| { - let requests = requests.clone(); - async move { - // Capture the request - let method = req.method().to_string(); - let path = req.uri().path().to_string(); - let headers: Vec<(String, String)> = req - .headers() - .iter() - .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) - .collect(); - - // Read the body - let body_bytes = req - .into_body() - .collect() - .await - .map(|collected| collected.to_bytes().to_vec()) - .unwrap_or_default(); - - // Store the request - requests.lock().unwrap().push(ReceivedRequest { - method, - path, - headers, - body: body_bytes, + tokio::select! { + result = listener.accept() => { + let (stream, _) = match result { + Ok(conn) => conn, + Err(e) => { + eprintln!("Mock server accept error: {}", e); + break; + } + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } }); - // Return 200 OK - Ok::<_, hyper::http::Error>( - Response::builder() - .status(200) - .body(hyper_migration::Body::from(r#"{"ok":true}"#)) - .unwrap(), - ) - } - }); - - let _ = hyper::server::conn::http1::Builder::new() - .serve_connection(io, service) - .await; - }); + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + _ = &mut shutdown_rx => { + break; + } + } } }); MockServer { addr, received_requests, + shutdown_tx: Some(shutdown_tx), } } @@ -126,3 +139,11 @@ impl MockServer { self.received_requests.lock().unwrap().clear(); } } + +impl Drop for MockServer { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 1d1b2d2c..ce778481 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -4,6 +4,7 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::test_helpers::create_tcp_test_config, mini_agent::MiniAgent, @@ -20,6 +21,8 @@ use std::time::Duration; #[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; +const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); + /// Helper to configure a config with mock server endpoints pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { let trace_url = format!("{}/api/v0.2/traces", mock_server_url); @@ -63,7 +66,6 @@ pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { /// Helper to verify trace request sent to mock server pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); assert!( @@ -300,9 +302,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[cfg(test)] #[tokio::test] #[serial] -async fn test_mini_agent_with_real_flushers() { - use common::mock_server::MockServer; - +async fn test_mini_agent_tcp_with_real_flushers() { let mock_server = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; @@ -375,7 +375,7 @@ async fn test_mini_agent_with_real_flushers() { assert_eq!(trace_response.status(), StatusCode::OK); // Wait for flush - tokio::time::sleep(Duration::from_millis(1500)).await; + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); @@ -386,8 +386,6 @@ async fn test_mini_agent_with_real_flushers() { #[tokio::test] #[serial] async fn test_mini_agent_named_pipe_with_real_flushers() { - use common::mock_server::MockServer; - let mock_server = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; @@ -469,7 +467,7 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { assert_eq!(trace_response.status(), StatusCode::OK); // Wait for flush - tokio::time::sleep(Duration::from_millis(1500)).await; + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); From 7922bac1b107a69ac18ac487235c061271c18d2e Mon Sep 17 00:00:00 2001 From: Lewis Date: Mon, 2 Feb 2026 12:50:19 -0500 Subject: [PATCH 6/8] Add type --- .../tests/integration_test.rs | 110 +----------------- 1 file changed, 5 insertions(+), 105 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index ce778481..2cee91c7 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -7,8 +7,10 @@ use common::helpers::{create_test_trace_payload, send_tcp_request}; use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ - config::test_helpers::create_tcp_test_config, mini_agent::MiniAgent, - proxy_flusher::ProxyFlusher, trace_flusher::TraceFlusher, + config::{test_helpers::create_tcp_test_config, Config}, + mini_agent::MiniAgent, + proxy_flusher::ProxyFlusher, + trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; @@ -100,34 +102,6 @@ pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { ); } -/// Create a test config with TCP transport -pub fn create_tcp_test_config(port: u16) -> Config { - Config { - dd_site: "mock-datadoghq.com".to_string(), - dd_apm_receiver_port: port, - dd_apm_windows_pipe_name: None, - dd_dogstatsd_port: 8125, - dd_dogstatsd_windows_pipe_name: None, - env_type: trace_utils::EnvironmentType::AzureFunction, - app_name: Some("test-app".to_string()), - max_request_content_length: 10_000_000, - obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() - .unwrap(), - os: std::env::consts::OS.to_string(), - tags: datadog_trace_agent::config::Tags::new(), - stats_flush_interval_secs: 10, - trace_flush_interval_secs: 5, - trace_intake: libdd_common::Endpoint::default(), - trace_stats_intake: libdd_common::Endpoint::default(), - profiling_intake: libdd_common::Endpoint::default(), - proxy_request_timeout_secs: 30, - proxy_request_max_retries: 3, - proxy_request_retry_backoff_base_ms: 100, - verify_env_timeout_ms: 1000, - proxy_url: None, - } -} - #[cfg(test)] #[tokio::test] #[serial] @@ -303,49 +277,15 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[tokio::test] #[serial] async fn test_mini_agent_tcp_with_real_flushers() { - let mock_server = MockServer::start().await; + let mock_server: MockServer = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; let mut config = create_tcp_test_config(8127); -<<<<<<< HEAD - config.trace_intake = libdd_common::Endpoint { - url: trace_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - config.trace_stats_intake = libdd_common::Endpoint { - url: stats_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - // Set short flush intervals for faster testing - config.trace_flush_interval_secs = 1; // 1 second - config.stats_flush_interval_secs = 1; // 1 second - - let config = Arc::new(config); - let test_port = config.dd_apm_receiver_port; - - // Create mini agent with REAL flushers - let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), - trace_flusher: Arc::new(ServerlessTraceFlusher::new( - aggregator.clone(), - config.clone(), - )), - stats_processor: Arc::new(ServerlessStatsProcessor {}), - stats_flusher: Arc::new(ServerlessStatsFlusher {}), - env_verifier: Arc::new(MockEnvVerifier), - proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), - }; -======= configure_mock_endpoints(&mut config, &mock_server.url()); let config = Arc::new(config); let test_port = config.dd_apm_receiver_port; let mini_agent = create_mini_agent_with_real_flushers(config); ->>>>>>> e680a1b (Extract test helpers) let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; @@ -389,29 +329,6 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { let mock_server = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; -<<<<<<< HEAD - // Create config pointing to mock server - let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); - let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - - let mut config = create_tcp_test_config(8127); - config.trace_intake = libdd_common::Endpoint { - url: trace_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - config.trace_stats_intake = libdd_common::Endpoint { - url: stats_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - // Set short flush intervals for faster testing - config.trace_flush_interval_secs = 1; // 1 second - config.stats_flush_interval_secs = 1; // 1 second - - // Configure for named pipe -======= ->>>>>>> e680a1b (Extract test helpers) let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; let mut config = create_tcp_test_config(0); configure_mock_endpoints(&mut config, &mock_server.url()); @@ -419,24 +336,7 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { config.dd_apm_receiver_port = 0; let config = Arc::new(config); -<<<<<<< HEAD - // Create mini agent with REAL flushers - let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), - trace_flusher: Arc::new(ServerlessTraceFlusher::new( - aggregator.clone(), - config.clone(), - )), - stats_processor: Arc::new(ServerlessStatsProcessor {}), - stats_flusher: Arc::new(ServerlessStatsFlusher {}), - env_verifier: Arc::new(MockEnvVerifier), - proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), - }; -======= let mini_agent = create_mini_agent_with_real_flushers(config); ->>>>>>> e680a1b (Extract test helpers) let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; From 5518b1341f43128fd29235ec84dbfa3835d2f5c4 Mon Sep 17 00:00:00 2001 From: Lewis Date: Thu, 12 Feb 2026 11:03:16 -0500 Subject: [PATCH 7/8] Have windows tests run with the correct pipe features --- .github/workflows/cargo.yml | 7 ++++++- crates/datadog-trace-agent/src/config.rs | 1 + crates/datadog-trace-agent/tests/integration_test.rs | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index 35c7bd36..26076b23 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -93,4 +93,9 @@ jobs: shell: bash run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME - shell: bash - run: cargo nextest run --workspace + run: | + if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then + cargo nextest run --workspace --features datadog-trace-agent/windows-pipes,dogstatsd/windows-pipes + else + cargo nextest run --workspace + fi diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 87ce1c68..2d94b4f3 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -567,6 +567,7 @@ pub mod test_helpers { use super::*; /// Create a test config with TCP transport + #[allow(clippy::unwrap_used)] pub fn create_tcp_test_config(port: u16) -> Config { Config { dd_site: "mock-datadoghq.com".to_string(), diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 2cee91c7..a240b812 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -322,7 +322,7 @@ async fn test_mini_agent_tcp_with_real_flushers() { agent_handle.abort(); } -#[cfg(all(test, windows))] +#[cfg(all(test, windows, feature = "windows-pipes"))] #[tokio::test] #[serial] async fn test_mini_agent_named_pipe_with_real_flushers() { From 7dda49119ccf1835023ef3c2d2688333d4727c4a Mon Sep 17 00:00:00 2001 From: Lewis Date: Thu, 12 Feb 2026 11:20:05 -0500 Subject: [PATCH 8/8] Fix dogstatsd initialization --- .github/workflows/cargo.yml | 2 +- crates/datadog-serverless-compat/src/main.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index 26076b23..fc640c5d 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -95,7 +95,7 @@ jobs: - shell: bash run: | if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then - cargo nextest run --workspace --features datadog-trace-agent/windows-pipes,dogstatsd/windows-pipes + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes else cargo nextest run --workspace fi diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 66be3554..c5991362 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -226,13 +226,20 @@ async fn start_dogstatsd( // 2. Start the aggregator service in the background tokio::spawn(service.run()); + #[cfg(all(windows, feature = "windows-pipes"))] let dogstatsd_config = DogStatsDConfig { host: AGENT_HOST.to_string(), port, metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name, }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); // 3. Use handle in DogStatsD (cheap to clone)