diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index 35c7bd36..fc640c5d 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -93,4 +93,9 @@ jobs: shell: bash run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME - shell: bash - run: cargo nextest run --workspace + run: | + if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes + else + cargo nextest run --workspace + fi diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 66be3554..c5991362 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -226,13 +226,20 @@ async fn start_dogstatsd( // 2. Start the aggregator service in the background tokio::spawn(service.run()); + #[cfg(all(windows, feature = "windows-pipes"))] let dogstatsd_config = DogStatsDConfig { host: AGENT_HOST.to_string(), port, metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name, }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); // 3. Use handle in DogStatsD (cheap to clone) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index 754f75eb..2d94b4f3 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -560,3 +560,39 @@ mod tests { assert_eq!(config.tags.function_tags(), None); } } + +/// Test helpers for creating Config instances in tests +#[cfg(any(test, debug_assertions))] +pub mod test_helpers { + use super::*; + + /// Create a test config with TCP transport + #[allow(clippy::unwrap_used)] + pub fn create_tcp_test_config(port: u16) -> Config { + Config { + dd_site: "mock-datadoghq.com".to_string(), + dd_apm_receiver_port: port, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + dd_apm_windows_pipe_name: None, + dd_dogstatsd_port: 8125, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + dd_dogstatsd_windows_pipe_name: None, + env_type: trace_utils::EnvironmentType::AzureFunction, + app_name: Some("test-app".to_string()), + max_request_content_length: 10_000_000, + obfuscation_config: obfuscation_config::ObfuscationConfig::new().unwrap(), + os: std::env::consts::OS.to_string(), + tags: Tags::new(), + stats_flush_interval_secs: 10, + trace_flush_interval_secs: 5, + trace_intake: Endpoint::default(), + trace_stats_intake: Endpoint::default(), + profiling_intake: Endpoint::default(), + proxy_request_timeout_secs: 30, + proxy_request_max_retries: 3, + proxy_request_retry_backoff_base_ms: 100, + verify_env_timeout_ms: 1000, + proxy_url: None, + } + } +} diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs new file mode 100644 index 00000000..5e0b485d --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -0,0 +1,149 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Simple mock HTTP server for testing flushers + +use http_body_util::BodyExt; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; + +#[derive(Clone, Debug)] +pub struct ReceivedRequest { + pub method: String, + pub path: String, + pub headers: Vec<(String, String)>, + pub body: Vec, +} + +pub struct MockServer { + pub addr: SocketAddr, + pub received_requests: Arc>>, + shutdown_tx: Option>, +} + +impl MockServer { + /// Start a mock HTTP server on a random port + pub async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind mock server"); + let addr = listener.local_addr().expect("Failed to get local addr"); + + let received_requests = Arc::new(Mutex::new(Vec::new())); + let requests_clone = received_requests.clone(); + + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + result = listener.accept() => { + let (stream, _) = match result { + Ok(conn) => conn, + Err(e) => { + eprintln!("Mock server accept error: {}", e); + break; + } + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } + }); + + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + _ = &mut shutdown_rx => { + break; + } + } + } + }); + + MockServer { + addr, + received_requests, + shutdown_tx: Some(shutdown_tx), + } + } + + /// Get the base URL of the mock server + pub fn url(&self) -> String { + format!("http://{}", self.addr) + } + + /// Get all received requests + #[allow(dead_code)] + pub fn get_requests(&self) -> Vec { + self.received_requests.lock().unwrap().clone() + } + + /// Get requests matching a path + pub fn get_requests_for_path(&self, path: &str) -> Vec { + self.received_requests + .lock() + .unwrap() + .iter() + .filter(|req| req.path == path) + .cloned() + .collect() + } + + /// Clear all received requests + #[allow(dead_code)] + pub fn clear_requests(&self) { + self.received_requests.lock().unwrap().clear(); + } +} + +impl Drop for MockServer { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs index 3d83bbdb..447946a7 100644 --- a/crates/datadog-trace-agent/tests/common/mod.rs +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -4,4 +4,5 @@ //! Common test utilities, mocks, and helpers for integration tests pub mod helpers; +pub mod mock_server; pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index ea2fde9e..a240b812 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -4,55 +4,109 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ - config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, + config::{test_helpers::create_tcp_test_config, Config}, + mini_agent::MiniAgent, + proxy_flusher::ProxyFlusher, + trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; -use libdd_trace_utils::trace_utils; use serde_json::Value; +use serial_test::serial; use std::sync::Arc; use std::time::Duration; #[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; -/// Create a test config with TCP transport -pub fn create_tcp_test_config() -> Config { - Config { - dd_site: "mock-datadoghq.com".to_string(), - dd_apm_receiver_port: 8126, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_apm_windows_pipe_name: None, - dd_dogstatsd_port: 8125, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name: None, - env_type: trace_utils::EnvironmentType::AzureFunction, - app_name: Some("test-app".to_string()), - max_request_content_length: 10_000_000, - obfuscation_config: libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig::new() - .unwrap(), - os: std::env::consts::OS.to_string(), - tags: datadog_trace_agent::config::Tags::new(), - stats_flush_interval_secs: 10, - trace_flush_interval_secs: 5, - trace_intake: libdd_common::Endpoint::default(), - trace_stats_intake: libdd_common::Endpoint::default(), - profiling_intake: libdd_common::Endpoint::default(), - proxy_request_timeout_secs: 30, - proxy_request_max_retries: 3, - proxy_request_retry_backoff_base_ms: 100, - verify_env_timeout_ms: 1000, - proxy_url: None, +const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); + +/// Helper to configure a config with mock server endpoints +pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { + let trace_url = format!("{}/api/v0.2/traces", mock_server_url); + let stats_url = format!("{}/api/v0.6/stats", mock_server_url); + + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_flush_interval_secs = 1; + config.stats_flush_interval_secs = 1; +} + +/// Helper to create a mini agent with real flushers +pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), } } +/// Helper to verify trace request sent to mock server +pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); +} + #[cfg(test)] #[tokio::test] +#[serial] async fn test_mini_agent_tcp_handles_requests() { - let config = Arc::new(create_tcp_test_config()); + let config = Arc::new(create_tcp_test_config(8126)); let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { config: config.clone(), @@ -146,9 +200,8 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior let pipe_name = "dd_trace_integration_test"; let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(0); config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); - config.dd_apm_receiver_port = 0; let config = Arc::new(config); let mini_agent = MiniAgent { @@ -219,3 +272,104 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Clean up agent_handle.abort(); } + +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_tcp_with_real_flushers() { + let mock_server: MockServer = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let mut config = create_tcp_test_config(8127); + configure_mock_endpoints(&mut config, &mock_server.url()); + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + let mini_agent = create_mini_agent_with_real_flushers(config); + + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); + + // Send trace data + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + assert_eq!(trace_response.status(), StatusCode::OK); + + // Wait for flush + tokio::time::sleep(FLUSH_WAIT_DURATION).await; + + verify_trace_request(&mock_server); + + agent_handle.abort(); +} + +#[cfg(all(test, windows, feature = "windows-pipes"))] +#[tokio::test] +#[serial] +async fn test_mini_agent_named_pipe_with_real_flushers() { + let mock_server = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + let mut config = create_tcp_test_config(0); + configure_mock_endpoints(&mut config, &mock_server.url()); + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_receiver_port = 0; + let config = Arc::new(config); + + let mini_agent = create_mini_agent_with_real_flushers(config); + + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent named pipe server failed to start within timeout" + ); + + // Send trace data via named pipe + let trace_payload = create_test_trace_payload(); + let trace_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!(trace_response.status(), StatusCode::OK); + + // Wait for flush + tokio::time::sleep(FLUSH_WAIT_DURATION).await; + + verify_trace_request(&mock_server); + + agent_handle.abort(); +}