diff --git a/.github/workflows/build-datadog-serverless-compat.yml b/.github/workflows/build-datadog-serverless-compat.yml index 46643766..13d18ee6 100644 --- a/.github/workflows/build-datadog-serverless-compat.yml +++ b/.github/workflows/build-datadog-serverless-compat.yml @@ -45,7 +45,7 @@ jobs: retention-days: 3 - if: ${{ inputs.runner == 'windows-2022' }} shell: bash - run: cargo build --release -p datadog-serverless-compat + run: cargo build --release -p datadog-serverless-compat --features windows-pipes - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 4483d949..b331e92f 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -5,6 +5,10 @@ edition.workspace = true license.workspace = true description = "Binary to run trace-agent and dogstatsd servers in Serverless environments" +[features] +default = [] +windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] + [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index adc106e7..66be3554 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -69,7 +69,7 @@ pub async fn main() { // Windows named pipe name for DogStatsD. // Normalize by adding \\.\pipe\ prefix if not present let dd_dogstatsd_windows_pipe_name: Option = { - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] { env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME") .ok() @@ -82,7 +82,7 @@ pub async fn main() { } }) } - #[cfg(not(windows))] + #[cfg(not(all(windows, feature = "windows-pipes")))] { None } @@ -178,6 +178,7 @@ pub async fn main() { https_proxy, dogstatsd_tags, dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] dd_dogstatsd_windows_pipe_name.clone(), ) .await; @@ -212,7 +213,7 @@ async fn start_dogstatsd( https_proxy: Option, dogstatsd_tags: &str, metric_namespace: Option, - windows_pipe_name: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, ) -> (CancellationToken, Option, AggregatorHandle) { // 1. Create the aggregator service #[allow(clippy::expect_used)] @@ -229,6 +230,7 @@ async fn start_dogstatsd( host: AGENT_HOST.to_string(), port, metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 8a4c871a..85daf48e 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -7,6 +7,10 @@ edition.workspace = true [lib] bench = false +[features] +default = [] +windows-pipes = [] + [dependencies] anyhow = "1.0" hyper = { version = "1.6", features = ["http1", "client", "server"] } diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index db405222..754f75eb 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -81,8 +81,10 @@ impl Tags { pub struct Config { pub dd_site: String, pub dd_apm_receiver_port: u16, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] pub dd_apm_windows_pipe_name: Option, pub dd_dogstatsd_port: u16, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] pub dd_dogstatsd_windows_pipe_name: Option, pub env_type: trace_utils::EnvironmentType, pub app_name: Option, @@ -122,7 +124,7 @@ impl Config { // Windows named pipe name for APM receiver. // Normalize by adding \\.\pipe\ prefix if not present let dd_apm_windows_pipe_name: Option = { - #[cfg(any(windows, test))] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] { env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| { if pipe_name.starts_with("\\\\.\\pipe\\") || pipe_name.starts_with(r"\\.\pipe\") @@ -133,7 +135,7 @@ impl Config { } }) } - #[cfg(not(any(windows, test)))] + #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] { None } @@ -148,7 +150,7 @@ impl Config { }; let dd_dogstatsd_windows_pipe_name: Option = { - #[cfg(any(windows, test))] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] { env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME") .ok() @@ -162,7 +164,7 @@ impl Config { } }) } - #[cfg(not(any(windows, test)))] + #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] { None } @@ -223,8 +225,10 @@ impl Config { proxy_request_retry_backoff_base_ms: 100, verify_env_timeout_ms: 100, dd_apm_receiver_port, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_apm_windows_pipe_name, dd_dogstatsd_port, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_dogstatsd_windows_pipe_name, dd_site, trace_intake: Endpoint { @@ -378,6 +382,7 @@ mod tests { #[test] #[serial] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] fn test_apm_windows_pipe_name() { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); @@ -399,6 +404,7 @@ mod tests { #[test] #[serial] + #[cfg(any(all(windows, feature = "windows-pipes"), test))] fn test_dogstatsd_windows_pipe_name() { env::set_var("DD_API_KEY", "_not_a_real_key_"); env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); @@ -456,6 +462,7 @@ mod tests { assert!(config_res.is_ok()); let config = config_res.unwrap(); assert_eq!(config.dd_apm_receiver_port, 8126); + #[cfg(any(all(windows, feature = "windows-pipes"), test))] assert_eq!(config.dd_apm_windows_pipe_name, None); env::remove_var("DD_API_KEY"); env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 7e7cef12..6af32b12 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -16,7 +16,7 @@ use tracing::{debug, error}; use crate::http_utils::{log_and_create_http_response, verify_request_content_length}; use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] use tokio::net::windows::named_pipe::ServerOptions; use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; @@ -129,7 +129,12 @@ impl MiniAgent { }); // Determine which transport to use based on configuration - if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref(); + #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] + let pipe_name_opt: Option<&String> = None; + + if let Some(pipe_name) = pipe_name_opt { debug!("Mini Agent started: listening on named pipe {}", pipe_name); } else { debug!( @@ -142,9 +147,9 @@ impl MiniAgent { now.elapsed().as_millis() ); - if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name { + if let Some(pipe_name) = pipe_name_opt { // Windows named pipe transport - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] { Self::serve_named_pipe( pipe_name, @@ -154,14 +159,14 @@ impl MiniAgent { ) .await?; } - - #[cfg(not(windows))] + #[cfg(not(all(windows, feature = "windows-pipes")))] { - error!( - "Named pipes are only supported on Windows, cannot use pipe: {}", + let _ = pipe_name; // Suppress unused variable warning + unreachable!( + "Named pipes are only supported on Windows with the windows-pipes feature \ + enabled, cannot use pipe: {}.", pipe_name ); - return Err("Named pipes are only supported on Windows".into()); } } else { // TCP transport @@ -250,7 +255,7 @@ impl MiniAgent { } } - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] async fn serve_named_pipe( pipe_name: &str, service: S, @@ -267,17 +272,15 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { - // pipe_name already includes \\.\pipe\ prefix from config - let pipe_path = pipe_name; - let server = hyper::server::conn::http1::Builder::new(); let mut joinset = tokio::task::JoinSet::new(); loop { // Create a new pipe instance - let pipe = match ServerOptions::new().create(&pipe_path) { + // pipe_name already includes \\.\pipe\ prefix from config + let pipe = match ServerOptions::new().create(pipe_name) { Ok(pipe) => { - debug!("Created pipe server instance '{}' in byte mode", pipe_path); + debug!("Created pipe server instance '{}' in byte mode", pipe_name); pipe } Err(e) => { @@ -304,7 +307,7 @@ impl MiniAgent { return Err(e.into()); } Ok(()) => { - debug!("Client connected to '{}'", pipe_path); + debug!("Client connected to '{}'", pipe_name); pipe } }, @@ -386,7 +389,10 @@ impl MiniAgent { } (_, INFO_ENDPOINT_PATH) => match Self::info_handler( config.dd_apm_receiver_port, + #[cfg(all(windows, feature = "windows-pipes"))] config.dd_apm_windows_pipe_name.as_deref(), + #[cfg(not(all(windows, feature = "windows-pipes")))] + None, config.dd_dogstatsd_port, ) { Ok(res) => Ok(res), diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index f9d796cc..41f6e9e8 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -204,8 +204,10 @@ mod tests { }, dd_site: "datadoghq.com".to_string(), dd_apm_receiver_port: 8126, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, + #[cfg(any(all(windows, feature = "windows-pipes"), test))] dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::CloudFunction, os: "linux".to_string(), diff --git a/crates/datadog-trace-agent/tests/common/helpers.rs b/crates/datadog-trace-agent/tests/common/helpers.rs index a41cf055..6f6e776f 100644 --- a/crates/datadog-trace-agent/tests/common/helpers.rs +++ b/crates/datadog-trace-agent/tests/common/helpers.rs @@ -55,7 +55,7 @@ pub async fn send_tcp_request( Ok(response) } -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] /// Send an HTTP request over named pipe and return the response pub async fn send_named_pipe_request( pipe_name: &str, diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 62d632ed..ea2fde9e 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -16,7 +16,7 @@ use serde_json::Value; use std::sync::Arc; use std::time::Duration; -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] use common::helpers::send_named_pipe_request; /// Create a test config with TCP transport @@ -24,8 +24,10 @@ pub fn create_tcp_test_config() -> Config { Config { dd_site: "mock-datadoghq.com".to_string(), dd_apm_receiver_port: 8126, + #[cfg(all(windows, feature = "windows-pipes"))] dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, + #[cfg(all(windows, feature = "windows-pipes"))] dd_dogstatsd_windows_pipe_name: None, env_type: trace_utils::EnvironmentType::AzureFunction, app_name: Some("test-app".to_string()), @@ -138,7 +140,7 @@ async fn test_mini_agent_tcp_handles_requests() { agent_handle.abort(); } -#[cfg(all(test, windows))] +#[cfg(all(test, windows, feature = "windows-pipes"))] #[tokio::test] async fn test_mini_agent_named_pipe_handles_requests() { // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index 4ec6e09d..9ef8ccc0 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -19,7 +19,7 @@ reqwest = { version = "0.12.4", features = ["json", "http2"], default-features = serde = { version = "1.0.197", default-features = false, features = ["derive"] } serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } thiserror = { version = "1.0.58", default-features = false } -tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] } +tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false } regex = { version = "1.10.6", default-features = false } @@ -35,3 +35,4 @@ tracing-test = { version = "0.2.5", default-features = false } [features] default = [ "reqwest/rustls-tls" ] fips = [ "reqwest/rustls-tls-no-provider", "datadog-fips/fips" ] +windows-pipes = ["tokio/io-util"] diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index a7faa2c7..30fbe995 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -16,12 +16,8 @@ use crate::metric::{id, parse, Metric}; use tracing::{debug, error, trace}; // Windows-specific imports -#[cfg(windows)] -use { - std::sync::Arc, - tokio::io::AsyncReadExt, - tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}, -}; +#[cfg(all(windows, feature = "windows-pipes"))] +use {std::sync::Arc, tokio::io::AsyncReadExt, tokio::net::windows::named_pipe::ServerOptions}; // DogStatsD buffer size for receiving metrics // TODO(astuyve) buf should be dynamic @@ -38,6 +34,7 @@ pub struct DogStatsDConfig { /// Optional namespace to prepend to all metric names (e.g., "myapp") pub metric_namespace: Option, /// Optional Windows named pipe name. (e.g., "\\\\.\\pipe\\my_pipe"). + #[cfg(all(windows, feature = "windows-pipes"))] pub windows_pipe_name: Option, } @@ -47,7 +44,7 @@ pub enum MessageSource { /// Message received from a network socket (UDP) Network(SocketAddr), /// Message received from a Windows named pipe (Arc for efficient cloning) - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] NamedPipe(Arc), } @@ -55,7 +52,7 @@ impl std::fmt::Display for MessageSource { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Network(addr) => write!(f, "{}", addr), - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] Self::NamedPipe(name) => write!(f, "{}", name), } } @@ -71,7 +68,7 @@ enum BufferReader { MirrorTest(Vec, SocketAddr), /// Windows named pipe reader (Windows-only transport) - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] NamedPipe { pipe_name: Arc, receiver: Arc>>>, @@ -98,7 +95,7 @@ impl BufferReader { // Mirror Reader: returns immediately with stored data Ok((data.clone(), MessageSource::Network(*socket))) } - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-pipes"))] BufferReader::NamedPipe { pipe_name, receiver, @@ -135,11 +132,17 @@ impl DogStatsD { aggregator_handle: AggregatorHandle, cancel_token: tokio_util::sync::CancellationToken, ) -> DogStatsD { - #[allow(unused_variables)] // pipe_name unused on non-Windows - let buffer_reader = if let Some(ref pipe_name) = config.windows_pipe_name { - #[cfg(windows)] + // Determine if we should use a named pipe or UDP/UDS + #[cfg(all(windows, feature = "windows-pipes"))] + let pipe_name_opt = config.windows_pipe_name.as_ref(); + #[cfg(not(all(windows, feature = "windows-pipes")))] + let pipe_name_opt: Option<&String> = None; + + let buffer_reader = if let Some(pipe_name_ref) = pipe_name_opt { + // Windows named pipe transport + #[cfg(all(windows, feature = "windows-pipes"))] { - let pipe_name = Arc::new(pipe_name.clone()); + let pipe_name = Arc::new(pipe_name_ref.clone()); // Create channel for receiving data from client handlers let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -157,10 +160,14 @@ impl DogStatsD { receiver, } } - #[cfg(not(windows))] - #[allow(clippy::panic)] + #[cfg(not(all(windows, feature = "windows-pipes")))] { - panic!("Named pipes are only supported on Windows.") + let _ = pipe_name_ref; // Suppress unused variable warning + unreachable!( + "Named pipes are only supported on Windows with the windows-pipes feature \ + enabled, cannot use pipe: {}.", + pipe_name_ref + ); } } else { // UDP socket for all platforms @@ -265,7 +272,7 @@ impl DogStatsD { /// Uses a multi-instance approach (like winio in the main agent): /// - Creates new server instance for each client /// - Spawns task to handle each client -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] async fn run_named_pipe_server( pipe_name: Arc, sender: tokio::sync::mpsc::UnboundedSender>, @@ -350,7 +357,7 @@ async fn run_named_pipe_server( break; } Ok(_) => { - if let Ok(_) = std::str::from_utf8(&buf[..complete_message_size]) { + if std::str::from_utf8(&buf[..complete_message_size]).is_ok() { debug!( "Sent {} bytes from '{}'", complete_message_size, pipe_name_clone diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 90130e79..e51b0973 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -20,7 +20,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use zstd::zstd_safe::CompressionLevel; -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] use tokio::{io::AsyncWriteExt, net::windows::named_pipe::ClientOptions}; #[cfg(test)] @@ -100,6 +100,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok host: "127.0.0.1".to_string(), port: 18125, metric_namespace: None, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); @@ -286,7 +287,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { } #[cfg(test)] -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] #[tokio::test] async fn test_named_pipe_basic_communication() { let pipe_name = r"\\.\pipe\test_dogstatsd_basic"; @@ -340,7 +341,7 @@ async fn test_named_pipe_basic_communication() { } #[cfg(test)] -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] #[tokio::test] async fn test_named_pipe_disconnect_reconnect() { let pipe_name = r"\\.\pipe\test_dogstatsd_reconnect"; @@ -409,7 +410,7 @@ async fn test_named_pipe_disconnect_reconnect() { } #[cfg(test)] -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] #[tokio::test] async fn test_named_pipe_cancellation() { let pipe_name = r"\\.\pipe\test_dogstatsd_cancel"; @@ -452,7 +453,7 @@ async fn test_named_pipe_cancellation() { } #[cfg(test)] -#[cfg(windows)] +#[cfg(all(windows, feature = "windows-pipes"))] #[tokio::test] async fn test_buffer_split_message() { let pipe_name = r"\\.\pipe\test_dogstatsd_buffer_split";