Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-datadog-serverless-compat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
retention-days: 3
- if: ${{ inputs.runner == 'windows-2022' }}
shell: bash
run: cargo build --release -p datadog-serverless-compat
run: cargo build --release -p datadog-serverless-compat --features windows-pipes
- if: ${{ inputs.runner == 'windows-2022' }}
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2
with:
Expand Down
4 changes: 4 additions & 0 deletions crates/datadog-serverless-compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ edition.workspace = true
license.workspace = true
description = "Binary to run trace-agent and dogstatsd servers in Serverless environments"

[features]
default = []
windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"]

[dependencies]
datadog-trace-agent = { path = "../datadog-trace-agent" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" }
Expand Down
8 changes: 5 additions & 3 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn main() {
// Windows named pipe name for DogStatsD.
// Normalize by adding \\.\pipe\ prefix if not present
let dd_dogstatsd_windows_pipe_name: Option<String> = {
#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
{
env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME")
.ok()
Expand All @@ -82,7 +82,7 @@ pub async fn main() {
}
})
}
#[cfg(not(windows))]
#[cfg(not(all(windows, feature = "windows-pipes")))]
{
None
}
Expand Down Expand Up @@ -178,6 +178,7 @@ pub async fn main() {
https_proxy,
dogstatsd_tags,
dd_statsd_metric_namespace,
#[cfg(all(windows, feature = "windows-pipes"))]
dd_dogstatsd_windows_pipe_name.clone(),
)
.await;
Expand Down Expand Up @@ -212,7 +213,7 @@ async fn start_dogstatsd(
https_proxy: Option<String>,
dogstatsd_tags: &str,
metric_namespace: Option<String>,
windows_pipe_name: Option<String>,
#[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option<String>,
) -> (CancellationToken, Option<Flusher>, AggregatorHandle) {
// 1. Create the aggregator service
#[allow(clippy::expect_used)]
Expand All @@ -229,6 +230,7 @@ async fn start_dogstatsd(
host: AGENT_HOST.to_string(),
port,
metric_namespace,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
Expand Down
4 changes: 4 additions & 0 deletions crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ edition.workspace = true
[lib]
bench = false

[features]
default = []
windows-pipes = []

[dependencies]
anyhow = "1.0"
hyper = { version = "1.6", features = ["http1", "client", "server"] }
Expand Down
15 changes: 11 additions & 4 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ impl Tags {
pub struct Config {
pub dd_site: String,
pub dd_apm_receiver_port: u16,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
pub dd_apm_windows_pipe_name: Option<String>,
pub dd_dogstatsd_port: u16,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
pub dd_dogstatsd_windows_pipe_name: Option<String>,
pub env_type: trace_utils::EnvironmentType,
pub app_name: Option<String>,
Expand Down Expand Up @@ -122,7 +124,7 @@ impl Config {
// Windows named pipe name for APM receiver.
// Normalize by adding \\.\pipe\ prefix if not present
let dd_apm_windows_pipe_name: Option<String> = {
#[cfg(any(windows, test))]
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
{
env::var("DD_APM_WINDOWS_PIPE_NAME").ok().map(|pipe_name| {
if pipe_name.starts_with("\\\\.\\pipe\\") || pipe_name.starts_with(r"\\.\pipe\")
Expand All @@ -133,7 +135,7 @@ impl Config {
}
})
}
#[cfg(not(any(windows, test)))]
#[cfg(not(any(all(windows, feature = "windows-pipes"), test)))]
{
None
}
Expand All @@ -148,7 +150,7 @@ impl Config {
};

let dd_dogstatsd_windows_pipe_name: Option<String> = {
#[cfg(any(windows, test))]
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
{
env::var("DD_DOGSTATSD_WINDOWS_PIPE_NAME")
.ok()
Expand All @@ -162,7 +164,7 @@ impl Config {
}
})
}
#[cfg(not(any(windows, test)))]
#[cfg(not(any(all(windows, feature = "windows-pipes"), test)))]
{
None
}
Expand Down Expand Up @@ -223,8 +225,10 @@ impl Config {
proxy_request_retry_backoff_base_ms: 100,
verify_env_timeout_ms: 100,
dd_apm_receiver_port,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_apm_windows_pipe_name,
dd_dogstatsd_port,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_dogstatsd_windows_pipe_name,
dd_site,
trace_intake: Endpoint {
Expand Down Expand Up @@ -378,6 +382,7 @@ mod tests {

#[test]
#[serial]
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
fn test_apm_windows_pipe_name() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
Expand All @@ -399,6 +404,7 @@ mod tests {

#[test]
#[serial]
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
fn test_dogstatsd_windows_pipe_name() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
Expand Down Expand Up @@ -456,6 +462,7 @@ mod tests {
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dd_apm_receiver_port, 8126);
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
assert_eq!(config.dd_apm_windows_pipe_name, None);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
Expand Down
38 changes: 22 additions & 16 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tracing::{debug, error};
use crate::http_utils::{log_and_create_http_response, verify_request_content_length};
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest};

#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
use tokio::net::windows::named_pipe::ServerOptions;

use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor};
Expand Down Expand Up @@ -129,7 +129,12 @@ impl MiniAgent {
});

// Determine which transport to use based on configuration
if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name {
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref();
#[cfg(not(any(all(windows, feature = "windows-pipes"), test)))]
let pipe_name_opt: Option<&String> = None;

if let Some(pipe_name) = pipe_name_opt {
debug!("Mini Agent started: listening on named pipe {}", pipe_name);
} else {
debug!(
Expand All @@ -142,9 +147,9 @@ impl MiniAgent {
now.elapsed().as_millis()
);

if let Some(ref pipe_name) = self.config.dd_apm_windows_pipe_name {
if let Some(pipe_name) = pipe_name_opt {
// Windows named pipe transport
#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
{
Self::serve_named_pipe(
pipe_name,
Expand All @@ -154,14 +159,14 @@ impl MiniAgent {
)
.await?;
}

#[cfg(not(windows))]
#[cfg(not(all(windows, feature = "windows-pipes")))]
{
error!(
"Named pipes are only supported on Windows, cannot use pipe: {}",
let _ = pipe_name; // Suppress unused variable warning
unreachable!(
"Named pipes are only supported on Windows with the windows-pipes feature \
enabled, cannot use pipe: {}.",
pipe_name
);
Comment thread
Lewis-E marked this conversation as resolved.
return Err("Named pipes are only supported on Windows".into());
}
} else {
// TCP transport
Expand Down Expand Up @@ -250,7 +255,7 @@ impl MiniAgent {
}
}

#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
async fn serve_named_pipe<S>(
pipe_name: &str,
service: S,
Expand All @@ -267,17 +272,15 @@ impl MiniAgent {
S::Future: Send,
S::Error: std::error::Error + Send + Sync + 'static,
{
// pipe_name already includes \\.\pipe\ prefix from config
let pipe_path = pipe_name;

let server = hyper::server::conn::http1::Builder::new();
let mut joinset = tokio::task::JoinSet::new();

loop {
// Create a new pipe instance
let pipe = match ServerOptions::new().create(&pipe_path) {
// pipe_name already includes \\.\pipe\ prefix from config
let pipe = match ServerOptions::new().create(pipe_name) {
Ok(pipe) => {
debug!("Created pipe server instance '{}' in byte mode", pipe_path);
debug!("Created pipe server instance '{}' in byte mode", pipe_name);
pipe
}
Err(e) => {
Expand All @@ -304,7 +307,7 @@ impl MiniAgent {
return Err(e.into());
}
Ok(()) => {
debug!("Client connected to '{}'", pipe_path);
debug!("Client connected to '{}'", pipe_name);
pipe
}
},
Expand Down Expand Up @@ -386,7 +389,10 @@ impl MiniAgent {
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(
config.dd_apm_receiver_port,
#[cfg(all(windows, feature = "windows-pipes"))]
config.dd_apm_windows_pipe_name.as_deref(),
#[cfg(not(all(windows, feature = "windows-pipes")))]
None,
config.dd_dogstatsd_port,
) {
Ok(res) => Ok(res),
Expand Down
2 changes: 2 additions & 0 deletions crates/datadog-trace-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ mod tests {
},
dd_site: "datadoghq.com".to_string(),
dd_apm_receiver_port: 8126,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_apm_windows_pipe_name: None,
dd_dogstatsd_port: 8125,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_dogstatsd_windows_pipe_name: None,
env_type: trace_utils::EnvironmentType::CloudFunction,
os: "linux".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/datadog-trace-agent/tests/common/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn send_tcp_request(
Ok(response)
}

#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
/// Send an HTTP request over named pipe and return the response
pub async fn send_named_pipe_request(
pipe_name: &str,
Expand Down
6 changes: 4 additions & 2 deletions crates/datadog-trace-agent/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use serde_json::Value;
use std::sync::Arc;
use std::time::Duration;

#[cfg(windows)]
#[cfg(all(windows, feature = "windows-pipes"))]
use common::helpers::send_named_pipe_request;

/// Create a test config with TCP transport
pub fn create_tcp_test_config() -> Config {
Config {
dd_site: "mock-datadoghq.com".to_string(),
dd_apm_receiver_port: 8126,
#[cfg(all(windows, feature = "windows-pipes"))]
dd_apm_windows_pipe_name: None,
dd_dogstatsd_port: 8125,
#[cfg(all(windows, feature = "windows-pipes"))]
dd_dogstatsd_windows_pipe_name: None,
env_type: trace_utils::EnvironmentType::AzureFunction,
app_name: Some("test-app".to_string()),
Expand Down Expand Up @@ -138,7 +140,7 @@ async fn test_mini_agent_tcp_handles_requests() {
agent_handle.abort();
}

#[cfg(all(test, windows))]
#[cfg(all(test, windows, feature = "windows-pipes"))]
#[tokio::test]
async fn test_mini_agent_named_pipe_handles_requests() {
// Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior
Expand Down
3 changes: 2 additions & 1 deletion crates/dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ reqwest = { version = "0.12.4", features = ["json", "http2"], default-features =
serde = { version = "1.0.197", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] }
thiserror = { version = "1.0.58", default-features = false }
tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] }
tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] }
tokio-util = { version = "0.7.11", default-features = false }
tracing = { version = "0.1.40", default-features = false }
regex = { version = "1.10.6", default-features = false }
Expand All @@ -35,3 +35,4 @@ tracing-test = { version = "0.2.5", default-features = false }
[features]
default = [ "reqwest/rustls-tls" ]
fips = [ "reqwest/rustls-tls-no-provider", "datadog-fips/fips" ]
windows-pipes = ["tokio/io-util"]
Loading
Loading