diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e42c6ce226..694119c831 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -45,6 +45,7 @@ libdd-data-pipeline*/ @DataDog/libdatadog-apm libdd-ddsketch*/ @DataDog/libdatadog-apm @DataDog/apm-common-components-core libdd-dogstatsd-client @DataDog/apm-common-components-core libdd-http-client @DataDog/apm-common-components-core +libdd-agent-client @DataDog/apm-common-components-core libdd-library-config*/ @DataDog/apm-sdk-capabilities-rust libdd-log*/ @DataDog/apm-common-components-core libdd-otel-thread-ctx/ @DataDog/apm-common-components-core diff --git a/Cargo.lock b/Cargo.lock index 43efba235d..6cd84d525d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2810,6 +2810,23 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "libdd-agent-client" +version = "32.0.0" +dependencies = [ + "bytes", + "flate2", + "httpmock", + "libdd-common", + "libdd-http-client", + "rustls", + "serde", + "serde_json", + "serial_test", + "thiserror 2.0.17", + "tokio", +] + [[package]] name = "libdd-alloc" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 290d70288a..a4c21d8b64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "libdd-tinybytes", "libdd-dogstatsd-client", "libdd-http-client", + "libdd-agent-client", "libdd-log", "libdd-log-ffi", ] diff --git a/libdd-agent-client/Cargo.toml b/libdd-agent-client/Cargo.toml new file mode 100644 index 0000000000..915355bcda --- /dev/null +++ b/libdd-agent-client/Cargo.toml @@ -0,0 +1,32 @@ +# Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "libdd-agent-client" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +description = "Datadog-agent-specialized HTTP client: language metadata injection, per-endpoint send methods, retry, and compression" +homepage = "https://github.com/DataDog/libdatadog/tree/main/libdd-agent-client" +repository = "https://github.com/DataDog/libdatadog/tree/main/libdd-agent-client" + +[lib] +bench = false + +[dependencies] +bytes = "1.4" +flate2 = "1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "2" +tokio = { version = "1.23", features = ["rt"] } +libdd-http-client = { version = "32.0", path = "../libdd-http-client" } +libdd-common = { version = "4.0", path = "../libdd-common", default-features = false } + +[dev-dependencies] +httpmock = "0.8.0-alpha.1" +rustls = { version = "0.23", default-features = false, features = ["ring"] } +serial_test = "3.2" +tokio = { version = "1.23", features = ["rt", "macros"] } diff --git a/libdd-agent-client/src/agent_info.rs b/libdd-agent-client/src/agent_info.rs new file mode 100644 index 0000000000..e9193f9d8b --- /dev/null +++ b/libdd-agent-client/src/agent_info.rs @@ -0,0 +1,30 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types for [`crate::AgentClient::agent_info`]. + +/// Parsed response from a `GET /info` probe. +/// +/// Returned by [`crate::AgentClient::agent_info`]. Contains agent capabilities and headers. +#[derive(Debug, Clone)] +pub struct AgentInfo { + /// Available agent endpoints, e.g. `["/v0.4/traces", "/v0.5/traces"]`. + pub endpoints: Vec, + /// Whether the agent supports client-side P0 dropping. + pub client_drop_p0s: bool, + /// Raw agent configuration block. + pub config: serde_json::Value, + /// Agent version string, if reported. + pub version: Option, + /// Parsed from the `Datadog-Container-Tags-Hash` response header. + /// + /// Used by dd-trace-py to compute the base tag hash (`agent.py:17-23`). + pub container_tags_hash: Option, + /// Value of the `Datadog-Agent-State` response header from the last `/info` fetch. + /// + /// The agent updates this opaque token whenever its internal state changes (e.g. a + /// configuration reload). Clients that poll `/info` periodically can skip re-parsing + /// the response body by comparing this value to the one returned by the previous call + /// and only acting when it differs. + pub state_hash: Option, +} diff --git a/libdd-agent-client/src/builder.rs b/libdd-agent-client/src/builder.rs new file mode 100644 index 0000000000..aa78faf56f --- /dev/null +++ b/libdd-agent-client/src/builder.rs @@ -0,0 +1,362 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Builder for [`crate::AgentClient`]. + +#[cfg(windows)] +use std::ffi::OsString; +#[cfg(unix)] +use std::path::PathBuf; +use std::{collections::HashMap, time::Duration}; + +use libdd_http_client::RetryConfig; + +use crate::{error::BuildError, language_metadata::LanguageMetadata, AgentClient}; + +/// Default timeout for agent requests. +pub const DEFAULT_TIMEOUT_MS: u64 = 2_000; + +/// Default retry configuration: 2 retries (3 total attempts), 100 ms initial delay, +/// exponential backoff with full jitter. +//TODO: Do we really want something different from `RetryConfig::default()` for the agent? The only +//difference is the number of retries : 3 vs 2 +pub fn default_retry_config() -> RetryConfig { + RetryConfig::new() + .max_retries(2) + .initial_delay(Duration::from_millis(100)) + .with_jitter(true) +} + +/// Transport configuration for the agent client. +/// +/// Determines how the client connects to the Datadog agent. +/// Set via [`AgentClientBuilder::transport`] or the convenience helpers +/// [`AgentClientBuilder::http`], [`AgentClientBuilder::unix_socket`], etc. +#[derive(Debug, Clone)] +pub enum AgentTransport { + /// HTTP over TCP. + Http { + /// Hostname or IP address. + host: String, + /// Port number. + port: u16, + }, + /// Unix Domain Socket. + /// + /// HTTP requests are still formed with `Host: localhost`. The socket path governs only the + /// transport layer. + #[cfg(unix)] + UnixSocket { + /// Filesystem path to the socket file. + path: PathBuf, + }, + /// Windows Named Pipe. + #[cfg(windows)] + NamedPipe { + /// Named pipe path, e.g. `\\.\pipe\DD_APM_DRIVER`. + path: OsString, + }, +} + +impl Default for AgentTransport { + fn default() -> Self { + AgentTransport::Http { + host: "localhost".to_string(), + port: 8126, + } + } +} + +/// Builder for [`AgentClient`]. +/// +/// Obtain via [`AgentClient::builder`]. +/// +/// # Required fields +/// +/// - Transport: one of [`AgentClientBuilder::http`], [`AgentClientBuilder::unix_socket`], +/// [`AgentClientBuilder::windows_named_pipe`], [`AgentClientBuilder::transport`]. +/// - [`AgentClientBuilder::language_metadata`]. +/// +/// # Test tokens +/// +/// Call [`AgentClientBuilder::test_agent_session_token`] to inject +/// `x-datadog-test-session-token` on every request. +#[derive(Debug, Default)] +pub struct AgentClientBuilder { + transport: Option, + test_token: Option, + timeout: Option, + language: Option, + retry: Option, + allow_connection_pooling: bool, + extra_headers: HashMap, +} + +impl AgentClientBuilder { + /// Create a new builder with default settings. + pub fn new() -> Self { + Self::default() + } + + /// Set the transport configuration. + pub fn transport(mut self, transport: AgentTransport) -> Self { + self.transport = Some(transport); + self + } + + /// Convenience: HTTP over TCP. + pub fn http(self, host: impl Into, port: u16) -> Self { + self.transport(AgentTransport::Http { + host: host.into(), + port, + }) + } + + /// Convenience: Unix Domain Socket. + #[cfg(unix)] + pub fn unix_socket(self, path: impl Into) -> Self { + self.transport(AgentTransport::UnixSocket { path: path.into() }) + } + + /// Convenience: Windows Named Pipe. + #[cfg(windows)] + pub fn windows_named_pipe(self, path: impl Into) -> Self { + self.transport(AgentTransport::NamedPipe { path: path.into() }) + } + + /// Set the test session token. + /// + /// When set, `x-datadog-test-session-token: ` is injected on every request. + pub fn test_agent_session_token(mut self, token: impl Into) -> Self { + self.test_token = Some(token.into()); + self + } + + /// Set the request timeout. + /// + /// Defaults to [`DEFAULT_TIMEOUT_MS`] (2 000 ms) when not set. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + + /// Override the default retry configuration. + /// + /// Defaults to [`default_retry_config`]. + pub fn retry(mut self, config: RetryConfig) -> Self { + self.retry = Some(config); + self + } + + /// Set the language/runtime metadata injected into every request. Required. + pub fn language_metadata(mut self, meta: LanguageMetadata) -> Self { + self.language = Some(meta); + self + } + + /// Allow connection pooling. Defaults to `false`. + /// + /// Note that whether pooling is actually used depends on the HTTP backend of + /// [libdd_http_client], though both currently available backends (reqwest and hyper) support + /// pooling. This setting should be understood as: if set to `false`, no connection pooling will + /// happen. If set to `true`, connection pooling may happen, at the discretion of the HTTP + /// backend. + /// + /// The Datadog agent has a low keep-alive timeout that causes "pipe closed" errors on every + /// second connection. The default of `false` is correct for all periodic-flush writers (traces, + /// stats, data streams). Set to `true` only for high-frequency continuous senders (e.g. a + /// streaming profiling exporter). + pub fn allow_connection_pooling(mut self, enabled: bool) -> Self { + self.allow_connection_pooling = enabled; + self + } + + // Compression + // + // Not exposed in this libv1. Gzip compression (level 6, matching dd-trace-py's trace writer at + // `writer.py:490`) will be added in a follow-up once the core send paths are stable. + // Per-method defaults (e.g. unconditional gzip for `send_pipeline_stats`) are already + // baked in; only the opt-in client-level `gzip(level)` builder knob is deferred. + + /// Additional custom headers to inject. + pub fn extra_headers(mut self, headers: HashMap) -> Self { + self.extra_headers = headers; + self + } + + /// Build the [`AgentClient`]. + pub fn build(self) -> Result { + let transport = self.transport.ok_or(BuildError::MissingTransport)?; + let language = self.language.ok_or(BuildError::MissingLanguageMetadata)?; + let timeout = self + .timeout + .unwrap_or(Duration::from_millis(DEFAULT_TIMEOUT_MS)); + let retry = self.retry.unwrap_or_else(default_retry_config); + + // Build the underlying HTTP client. + let http = + Self::build_http_client(transport, timeout, retry, self.allow_connection_pooling) + .map_err(|e| BuildError::HttpClient(e.to_string()))?; + + // Pre-compute all static headers that are injected on every request. + let static_headers = + Self::build_static_headers(&language, self.test_token, self.extra_headers); + + Ok(AgentClient::new(http, static_headers)) + } + + fn build_http_client( + transport: AgentTransport, + timeout: Duration, + retry: RetryConfig, + allow_connection_pooling: bool, + ) -> Result { + let base_url = match &transport { + AgentTransport::Http { host, port } => format!("http://{}:{}", host, port), + #[cfg(unix)] + AgentTransport::UnixSocket { .. } => "http://localhost".to_string(), + #[cfg(windows)] + AgentTransport::NamedPipe { .. } => "http://localhost".to_string(), + }; + + let mut builder = libdd_http_client::HttpClient::builder() + .base_url(base_url) + .timeout(timeout) + // HTTP errors are handled by each send method, not by the underlying client. + // This allows methods like `agent_info` to interpret 404 as Ok(None) rather than + // an error, and avoids retrying on HTTP 4xx/5xx. + .treat_http_errors_as_errors(false) + .allow_connection_pooling(allow_connection_pooling) + .retry(retry); + + match transport { + AgentTransport::Http { .. } => {} + #[cfg(unix)] + AgentTransport::UnixSocket { path } => { + builder = builder.unix_socket(path); + } + #[cfg(windows)] + AgentTransport::NamedPipe { path } => { + builder = builder.windows_named_pipe(path); + } + } + + builder.build() + } + + fn build_static_headers( + language: &LanguageMetadata, + test_token: Option, + extra_headers: HashMap, + ) -> Vec<(String, String)> { + let mut headers = vec![ + ("Datadog-Meta-Lang".to_string(), language.language.clone()), + ( + "Datadog-Meta-Lang-Version".to_string(), + language.language_version.clone(), + ), + ( + "Datadog-Meta-Lang-Interpreter".to_string(), + language.interpreter.clone(), + ), + ( + "Datadog-Meta-Tracer-Version".to_string(), + language.tracer_version.clone(), + ), + ("User-Agent".to_string(), language.user_agent()), + ]; + + if let Some(token) = test_token { + headers.push(("x-datadog-test-session-token".to_string(), token)); + } + + headers.extend(Self::container_headers()); + headers.extend(extra_headers); + + headers + } + + /// Read container / entity-ID headers from the host environment. + fn container_headers() -> Vec<(String, String)> { + use libdd_common::entity_id; + + let mut headers = Vec::new(); + + if let Some(container_id) = entity_id::get_container_id() { + headers.push(("Datadog-Container-Id".to_string(), container_id.to_owned())); + } + + if let Some(entity_id) = entity_id::get_entity_id() { + headers.push(("Datadog-Entity-ID".to_string(), entity_id.to_owned())); + } + + headers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_transport_is_localhost_8126() { + let t = AgentTransport::default(); + match t { + AgentTransport::Http { host, port } => { + assert_eq!(host, "localhost"); + assert_eq!(port, 8126); + } + #[allow(unreachable_patterns)] + _ => panic!("unexpected default transport"), + } + } + + #[test] + fn default_retry_config_is_constructable() { + // Just verify default_retry_config() doesn't panic. + let _cfg = default_retry_config(); + } + + #[test] + fn builder_new_is_default() { + let b = AgentClientBuilder::new(); + assert!(b.transport.is_none()); + assert!(b.language.is_none()); + assert!(!b.allow_connection_pooling); + } + + #[test] + fn build_fails_without_transport() { + let result = AgentClientBuilder::new() + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build(); + assert!(matches!(result, Err(BuildError::MissingTransport))); + } + + #[test] + fn build_fails_without_language_metadata() { + let result = AgentClientBuilder::new().http("localhost", 8126).build(); + assert!(matches!(result, Err(BuildError::MissingLanguageMetadata))); + } + + #[test] + fn build_succeeds_with_required_fields() { + let _ = rustls::crypto::ring::default_provider().install_default(); + let result = AgentClientBuilder::new() + .http("localhost", 8126) + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build(); + assert!(result.is_ok()); + } + + #[test] + fn extra_headers_stored() { + let mut headers = HashMap::new(); + headers.insert("X-Custom".to_string(), "value".to_string()); + let b = AgentClientBuilder::new().extra_headers(headers); + assert_eq!( + b.extra_headers.get("X-Custom").map(|s| s.as_str()), + Some("value") + ); + } +} diff --git a/libdd-agent-client/src/client.rs b/libdd-agent-client/src/client.rs new file mode 100644 index 0000000000..d63cf73e60 --- /dev/null +++ b/libdd-agent-client/src/client.rs @@ -0,0 +1,324 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! [`AgentClient`] and its send methods. + +use std::collections::HashMap; + +use bytes::Bytes; +use flate2::{write::GzEncoder, Compression}; +use libdd_http_client::{HttpClient, HttpMethod, HttpRequest}; +use serde_json::{from_slice, Value}; +use std::io::Write as _; + +use crate::{ + agent_info::AgentInfo, + builder::AgentClientBuilder, + error::SendError, + telemetry::TelemetryRequest, + traces::{AgentResponse, TraceFormat, TraceSendOptions}, +}; + +/// A Datadog-agent-specialized HTTP client. +/// +/// Wraps a configured [`libdd_http_client::HttpClient`] and injects Datadog-specific headers +/// automatically on every request: +/// +/// - Language metadata headers (`Datadog-Meta-Lang`, `Datadog-Meta-Lang-Version`, +/// `Datadog-Meta-Lang-Interpreter`, `Datadog-Meta-Tracer-Version`) from the [`LanguageMetadata`] +/// supplied when creating the client. +/// - `User-Agent` derived from [`LanguageMetadata::user_agent`]. +/// - Container/entity-ID headers (`Datadog-Container-Id`, `Datadog-Entity-ID`, +/// `Datadog-External-Env`) read from `/proc/self/cgroup` at startup. +/// - `x-datadog-test-session-token` when a test token was set. +/// - Any extra headers registered via [`AgentClientBuilder::extra_headers`]. +/// +/// Obtain via [`AgentClient::builder`]. +/// +/// [`LanguageMetadata`]: crate::LanguageMetadata +pub struct AgentClient { + http: HttpClient, + base_url: String, + static_headers: Vec<(String, String)>, +} + +impl AgentClient { + pub(crate) fn new(http: HttpClient, static_headers: Vec<(String, String)>) -> Self { + let base_url = http.config().base_url().to_string(); + Self { + http, + base_url, + static_headers, + } + } + + /// Create a new [`AgentClientBuilder`]. + pub fn builder() -> AgentClientBuilder { + AgentClientBuilder::new() + } + + /// Send a serialised trace payload to the agent with automatically injected headers. + /// + /// # Returns + /// + /// An [`AgentResponse`] with the HTTP status and the parsed `rate_by_service` sampling + /// rates from the agent response body. + pub async fn send_traces( + &self, + payload: Bytes, + trace_count: usize, + format: TraceFormat, + opts: TraceSendOptions, + ) -> Result { + let (path, content_type) = match format { + TraceFormat::MsgpackV5 => ("/v0.5/traces", "application/msgpack"), + TraceFormat::MsgpackV4 => ("/v0.4/traces", "application/msgpack"), + }; + + let mut request = HttpRequest::new(HttpMethod::Put, format!("{}{}", self.base_url, path)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", content_type) + .with_header("X-Datadog-Trace-Count", trace_count.to_string()) + .with_header("Datadog-Send-Real-Http-Status", "true"); + + if opts.computed_top_level { + request = request.with_header("Datadog-Client-Computed-Top-Level", "yes"); + } + + let response = self.http.send(request).await?; + + if response.status_code() >= 400 { + return Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }); + } + + let rate_by_service = parse_rate_by_service(response.body()); + Ok(AgentResponse { + status: response.status_code(), + rate_by_service, + }) + } + + /// Send span stats (APM concentrator buckets) to `/v0.6/stats`. + pub async fn send_stats(&self, payload: Bytes) -> Result<(), SendError> { + let request = HttpRequest::new(HttpMethod::Put, format!("{}/v0.6/stats", self.base_url)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/msgpack"); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send data-streams pipeline stats to `/v0.1/pipeline_stats`. + /// + /// The payload is **always** gzip-compressed regardless of the client-level compression + /// setting. This is a protocol requirement of the data-streams endpoint. + pub async fn send_pipeline_stats(&self, payload: Bytes) -> Result<(), SendError> { + let request = HttpRequest::new( + HttpMethod::Put, + format!("{}/v0.1/pipeline_stats", self.base_url), + ) + .with_body(gzip_compress(payload)?) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/msgpack") + .with_header("Content-Encoding", "gzip"); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send a telemetry event to the agent's telemetry proxy + /// (`telemetry/proxy/api/v2/apmtelemetry`). + pub async fn send_telemetry(&self, req: TelemetryRequest) -> Result<(), SendError> { + let request = HttpRequest::new( + HttpMethod::Post, + format!("{}/telemetry/proxy/api/v2/apmtelemetry", self.base_url), + ) + .with_body(req.body) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", "application/json") + .with_header("DD-Telemetry-Request-Type", &req.request_type) + .with_header("DD-Telemetry-API-Version", &req.api_version) + .with_header( + "DD-Telemetry-Debug-Enabled", + if req.debug { "true" } else { "false" }, + ); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Send an event via the agent's EVP (Event Platform) proxy. + /// + /// The agent forwards the request to `.datadoghq.com`. `subdomain` + /// controls the target intake (injected as `X-Datadog-EVP-Subdomain`); `path` is the + /// endpoint on that intake (e.g. `/api/v2/exposures`). + pub async fn send_evp_event( + &self, + subdomain: &str, + path: &str, + payload: Bytes, + content_type: &str, + ) -> Result<(), SendError> { + let request = HttpRequest::new(HttpMethod::Post, format!("{}{}", self.base_url, path)) + .with_body(payload) + .with_headers(self.static_headers.iter().cloned()) + .with_header("Content-Type", content_type) + .with_header("X-Datadog-EVP-Subdomain", subdomain); + + let response = self.http.send(request).await?; + check_status(response) + } + + /// Probe `GET /info` and return parsed agent capabilities. + /// + /// Returns `Ok(None)` when the agent returns 404 (remote-config / info not supported). + pub async fn agent_info(&self) -> Result, SendError> { + #[derive(serde::Deserialize)] + struct InfoResponse { + version: Option, + endpoints: Option>, + client_drop_p0s: Option, + config: Option, + } + + let request = HttpRequest::new(HttpMethod::Get, format!("{}/info", self.base_url)) + .with_headers(self.static_headers.iter().cloned()); + + let response = self.http.send(request).await?; + + if response.status_code() == 404 { + return Ok(None); + } + + if response.status_code() >= 400 { + return Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }); + } + + // Case-insensitive lookup of a response header value. + let header = |name: &str| -> Option { + response + .headers() + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| v.clone()) + }; + + let container_tags_hash = header("datadog-container-tags-hash"); + let state_hash = header("datadog-agent-state"); + + let info: InfoResponse = + from_slice(response.body()).map_err(|e| SendError::Encoding(e.to_string()))?; + + Ok(Some(AgentInfo { + endpoints: info.endpoints.unwrap_or_default(), + client_drop_p0s: info.client_drop_p0s.unwrap_or(false), + config: info.config.unwrap_or(Value::Null), + version: info.version, + container_tags_hash, + state_hash, + })) + } +} + +/// Parse `rate_by_service` from an agent trace response body. +fn parse_rate_by_service(body: &Bytes) -> Option> { + #[derive(serde::Deserialize)] + struct TraceResponse { + rate_by_service: Option>, + } + + from_slice::(body) + .ok() + .and_then(|r| r.rate_by_service) +} + +/// Return `Ok(())` for 2xx, or `Err(SendError::HttpError)` for anything else. +fn check_status(response: libdd_http_client::HttpResponse) -> Result<(), SendError> { + if response.status_code() >= 400 { + Err(SendError::HttpError { + status: response.status_code(), + body: response.body().clone(), + }) + } else { + Ok(()) + } +} + +/// Gzip-compress `payload` at level 6 (matching dd-trace-py's trace writer). +fn gzip_compress(payload: Bytes) -> Result { + let mut encoder = GzEncoder::new(Vec::new(), Compression::new(6)); + encoder + .write_all(&payload) + .map_err(|e| SendError::Encoding(e.to_string()))?; + let compressed = encoder + .finish() + .map_err(|e| SendError::Encoding(e.to_string()))?; + Ok(Bytes::from(compressed)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{AgentClient, LanguageMetadata}; + + fn ensure_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); + } + + fn test_client(port: u16) -> AgentClient { + ensure_crypto_provider(); + AgentClient::builder() + .http("localhost", port) + .language_metadata(LanguageMetadata::new("python", "3.12", "CPython", "2.0")) + .build() + .unwrap() + } + + #[test] + fn builder_roundtrip() { + let client = test_client(8126); + assert!(client.base_url.contains("localhost")); + } + + #[test] + fn static_headers_contain_language_metadata() { + let client = test_client(8126); + let keys: Vec<&str> = client + .static_headers + .iter() + .map(|(k, _)| k.as_str()) + .collect(); + assert!(keys.contains(&"Datadog-Meta-Lang")); + assert!(keys.contains(&"Datadog-Meta-Lang-Version")); + assert!(keys.contains(&"User-Agent")); + } + + #[test] + fn gzip_compress_produces_valid_gzip() { + let input = Bytes::from_static(b"hello world"); + let compressed = gzip_compress(input).unwrap(); + // gzip magic bytes: 0x1f 0x8b + assert_eq!(&compressed[..2], &[0x1f, 0x8b]); + } + + #[test] + fn parse_rate_by_service_valid_json() { + let body = Bytes::from(r#"{"rate_by_service":{"service:env":0.5}}"#); + let rates = parse_rate_by_service(&body).unwrap(); + assert_eq!(rates.get("service:env"), Some(&0.5)); + } + + #[test] + fn parse_rate_by_service_absent_field() { + let body = Bytes::from(r#"{"other":"value"}"#); + assert!(parse_rate_by_service(&body).is_none()); + } +} diff --git a/libdd-agent-client/src/error.rs b/libdd-agent-client/src/error.rs new file mode 100644 index 0000000000..cacb7da46f --- /dev/null +++ b/libdd-agent-client/src/error.rs @@ -0,0 +1,69 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Error types for [`crate::AgentClient`]. + +use bytes::Bytes; +use libdd_http_client::HttpClientError; +use std::io::{Error, ErrorKind}; +use thiserror::Error; + +/// Errors that can occur when building an [`crate::AgentClient`]. +#[derive(Debug, Error)] +pub enum BuildError { + /// No transport was configured. + #[error("transport is required")] + MissingTransport, + /// No language metadata was configured. + #[error("language metadata is required")] + MissingLanguageMetadata, + /// The underlying HTTP client could not be constructed. + #[error("HTTP client error: {0}")] + HttpClient(String), +} + +/// Errors that can occur when sending a request via [`crate::AgentClient`]. +#[derive(Debug, Error)] +pub enum SendError { + /// Connection refused, timeout, or I/O error. + #[error("transport error: {0}")] + Transport(#[source] std::io::Error), + /// The server returned an HTTP error status. Includes the raw status and body. + #[error("HTTP error {status}: {body:?}")] + HttpError { + /// HTTP status code returned by the server. + status: u16, + /// Raw response body. + body: Bytes, + }, + /// All retry attempts exhausted without a successful response. + #[error("retries exhausted: {last_error}")] + RetriesExhausted { + /// The last error encountered before giving up. + last_error: Box, + }, + /// Payload serialisation or compression failure. + #[error("encoding error: {0}")] + Encoding(String), +} + +impl From for SendError { + fn from(err: HttpClientError) -> Self { + match err { + HttpClientError::ConnectionFailed(s) => { + SendError::Transport(Error::new(ErrorKind::ConnectionRefused, s)) + } + HttpClientError::TimedOut => { + SendError::Transport(Error::new(ErrorKind::TimedOut, "request timed out")) + } + HttpClientError::IoError(s) => SendError::Transport(Error::other(s)), + HttpClientError::InvalidConfig(s) => { + SendError::Transport(Error::new(ErrorKind::InvalidInput, s)) + } + HttpClientError::RequestFailed { status, body } => SendError::HttpError { + status, + body: Bytes::from(body), + }, + } + } +} diff --git a/libdd-agent-client/src/language_metadata.rs b/libdd-agent-client/src/language_metadata.rs new file mode 100644 index 0000000000..eb599f90ee --- /dev/null +++ b/libdd-agent-client/src/language_metadata.rs @@ -0,0 +1,69 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Language/runtime metadata injected into every outgoing request. + +/// Language and runtime metadata that is automatically injected into every request as +/// `Datadog-Meta-*` headers and drives the `User-Agent` string. +#[derive(Debug, Clone)] +pub struct LanguageMetadata { + /// Value of `Datadog-Meta-Lang`, e.g. `"python"`. + pub language: String, + /// Value of `Datadog-Meta-Lang-Version`, e.g. `"3.12.1"`. + pub language_version: String, + /// Value of `Datadog-Meta-Lang-Interpreter`, e.g. `"CPython"`. + pub interpreter: String, + /// Value of `Datadog-Meta-Tracer-Version`, e.g. `"2.18.0"`. + pub tracer_version: String, +} + +impl LanguageMetadata { + /// Construct a new `LanguageMetadata`. + pub fn new( + language: impl Into, + language_version: impl Into, + interpreter: impl Into, + tracer_version: impl Into, + ) -> Self { + Self { + language: language.into(), + language_version: language_version.into(), + interpreter: interpreter.into(), + tracer_version: tracer_version.into(), + } + } + + /// Produces the `User-Agent` string passed to `Endpoint::to_request_builder()`. + /// + /// Format: `dd-trace-/`, e.g. `dd-trace-python/2.18.0`. + #[inline] + pub fn user_agent(&self) -> String { + format!("dd-trace-{}/{}", self.language, self.tracer_version) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_stores_fields() { + let m = LanguageMetadata::new("python", "3.12.1", "CPython", "2.18.0"); + assert_eq!(m.language, "python"); + assert_eq!(m.language_version, "3.12.1"); + assert_eq!(m.interpreter, "CPython"); + assert_eq!(m.tracer_version, "2.18.0"); + } + + #[test] + fn user_agent_format() { + let m = LanguageMetadata::new("python", "3.12.1", "CPython", "2.18.0"); + assert_eq!(m.user_agent(), "dd-trace-python/2.18.0"); + } + + #[test] + fn user_agent_ruby() { + let m = LanguageMetadata::new("ruby", "3.2.0", "MRI", "1.13.0"); + assert_eq!(m.user_agent(), "dd-trace-ruby/1.13.0"); + } +} diff --git a/libdd-agent-client/src/lib.rs b/libdd-agent-client/src/lib.rs new file mode 100644 index 0000000000..0f0f97e736 --- /dev/null +++ b/libdd-agent-client/src/lib.rs @@ -0,0 +1,43 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! This crate provides a Datadog-agent-specific HTTP client sitting on top of the basic +//! `libdd-http-client` primitives. The API is higher-level and makes agent-specific settings +//! (headers, etc.) the default rather than opt-in boilerplate. +//! +//! # Quick start +//! +//! ```rust,no_run +//! # fn example() -> Result<(), libdd_agent_client::BuildError> { +//! use libdd_agent_client::{AgentClient, LanguageMetadata}; +//! +//! let client = AgentClient::builder() +//! .http("localhost", 8126) +//! .language_metadata(LanguageMetadata::new( +//! "python", "3.12.1", "CPython", "2.18.0", +//! )) +//! .build()?; +//! # Ok(()) +//! # } +//! ``` +//! +//! # Fork safety +//! +//! The underlying `libdd-http-client` uses the `hickory-dns` DNS resolver by default, which is +//! in-process and fork-safe. + +pub mod agent_info; +pub mod builder; +pub mod client; +pub mod error; +pub mod language_metadata; +pub mod telemetry; +pub mod traces; + +pub use agent_info::AgentInfo; +pub use builder::{AgentClientBuilder, AgentTransport}; +pub use client::AgentClient; +pub use error::{BuildError, SendError}; +pub use language_metadata::LanguageMetadata; +pub use telemetry::TelemetryRequest; +pub use traces::{AgentResponse, TraceFormat, TraceSendOptions}; diff --git a/libdd-agent-client/src/telemetry.rs b/libdd-agent-client/src/telemetry.rs new file mode 100644 index 0000000000..de1630b10b --- /dev/null +++ b/libdd-agent-client/src/telemetry.rs @@ -0,0 +1,26 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types specific to [`crate::AgentClient::send_telemetry`]. + +/// A single telemetry event to send via [`crate::AgentClient::send_telemetry`]. +/// +/// The three per-request headers `DD-Telemetry-Request-Type`, `DD-Telemetry-API-Version`, and +/// `DD-Telemetry-Debug-Enabled` are derived automatically from the struct by the client. +/// +/// The client always routes to the agent telemetry proxy endpoint +/// (`telemetry/proxy/api/v2/apmtelemetry`). +#[derive(Debug, Clone)] +pub struct TelemetryRequest { + /// Value for the `DD-Telemetry-Request-Type` header, e.g. `"app-started"`. + pub request_type: String, + /// Value for the `DD-Telemetry-API-Version` header, e.g. `"v2"`. + pub api_version: String, + /// Value for the `DD-Telemetry-Debug-Enabled` header. + pub debug: bool, + /// Pre-serialized JSON payload body. + /// + /// The caller is responsible for serializing the event body to JSON before constructing this + /// struct. The client sends these bytes as `Content-Type: application/json`. + pub body: bytes::Bytes, +} diff --git a/libdd-agent-client/src/traces.rs b/libdd-agent-client/src/traces.rs new file mode 100644 index 0000000000..28827b87f4 --- /dev/null +++ b/libdd-agent-client/src/traces.rs @@ -0,0 +1,46 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Types specific to [`crate::AgentClient::send_traces`]. + +use std::collections::HashMap; + +/// Wire format of the trace payload. +/// +/// Determines both the `Content-Type` header and the target endpoint. +/// +/// # Format selection +/// +/// The caller is currently responsible for choosing the format. In practice this means starting +/// with [`TraceFormat::MsgpackV5`] and downgrading to [`TraceFormat::MsgpackV4`] when the agent +/// returns 404 or 415 (e.g. on Windows, or when AppSec/IAST is active). +/// +/// In a future version this negotiation may be moved into the client itself so that format +/// selection becomes automatic and callers no longer need to track the downgrade state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TraceFormat { + /// `application/msgpack` to `/v0.5/traces`. Preferred format. + MsgpackV5, + /// `application/msgpack` to `/v0.4/traces`. Fallback for Windows / AppSec. + MsgpackV4, +} + +/// Per-request options for [`crate::AgentClient::send_traces`]. +#[derive(Debug, Clone, Default)] +pub struct TraceSendOptions { + /// When `true`, appends `Datadog-Client-Computed-Top-Level: yes`. + /// + /// Signals to the agent that the client has already marked top-level spans, allowing the agent + /// to skip its own top-level computation. + pub computed_top_level: bool, +} + +/// Parsed response from the agent after a successful trace submission. +#[derive(Debug, Clone)] +pub struct AgentResponse { + /// HTTP status code returned by the agent. + pub status: u16, + /// Per-service sampling rates parsed from the `rate_by_service` field of the agent response + /// body, if present. + pub rate_by_service: Option>, +} diff --git a/libdd-agent-client/tests/agent_info.rs b/libdd-agent-client/tests/agent_info.rs new file mode 100644 index 0000000000..721ad2ce09 --- /dev/null +++ b/libdd-agent-client/tests/agent_info.rs @@ -0,0 +1,65 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use httpmock::prelude::*; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn parses_info_response() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(200).body( + r#"{ + "version": "7.50.0", + "endpoints": ["/v0.4/traces", "/v0.5/traces"], + "client_drop_p0s": true, + "config": {} + }"#, + ); + }); + + let info = common::client_for(&server) + .agent_info() + .await + .unwrap() + .expect("expected Some"); + + assert_eq!(info.version.as_deref(), Some("7.50.0")); + assert!(info.endpoints.contains(&"/v0.5/traces".to_string())); + assert!(info.client_drop_p0s); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn returns_none_on_404() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(404).body("not found"); + }); + + let result = common::client_for(&server).agent_info().await.unwrap(); + assert!(result.is_none()); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn extracts_container_tags_hash_header() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(200) + .header("Datadog-Container-Tags-Hash", "abc123") + .body(r#"{"endpoints":[],"client_drop_p0s":false}"#); + }); + + let info = common::client_for(&server) + .agent_info() + .await + .unwrap() + .unwrap(); + assert_eq!(info.container_tags_hash.as_deref(), Some("abc123")); +} diff --git a/libdd-agent-client/tests/common.rs b/libdd-agent-client/tests/common.rs new file mode 100644 index 0000000000..79f0869d9f --- /dev/null +++ b/libdd-agent-client/tests/common.rs @@ -0,0 +1,20 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use httpmock::MockServer; +use libdd_agent_client::{AgentClient, LanguageMetadata}; + +pub fn ensure_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); +} + +pub fn client_for(server: &MockServer) -> AgentClient { + ensure_crypto_provider(); + AgentClient::builder() + .http("localhost", server.port()) + .language_metadata(LanguageMetadata::new( + "python", "3.12.1", "CPython", "2.18.0", + )) + .build() + .expect("client build failed") +} diff --git a/libdd-agent-client/tests/evp_event.rs b/libdd-agent-client/tests/evp_event.rs new file mode 100644 index 0000000000..be8fc4e19b --- /dev/null +++ b/libdd-agent-client/tests/evp_event.rs @@ -0,0 +1,31 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn posts_to_path_with_subdomain_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/api/v2/exposures") + .header("X-Datadog-EVP-Subdomain", "event-platform-intake"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_evp_event( + "event-platform-intake", + "/api/v2/exposures", + Bytes::from_static(b"{}"), + "application/json", + ) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/pipeline_stats.rs b/libdd-agent-client/tests/pipeline_stats.rs new file mode 100644 index 0000000000..5d0ff71638 --- /dev/null +++ b/libdd-agent-client/tests/pipeline_stats.rs @@ -0,0 +1,43 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn puts_to_correct_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.1/pipeline_stats"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_pipeline_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn sets_gzip_encoding() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.1/pipeline_stats") + .header("Content-Encoding", "gzip"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_pipeline_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/static_headers.rs b/libdd-agent-client/tests/static_headers.rs new file mode 100644 index 0000000000..20581c736a --- /dev/null +++ b/libdd-agent-client/tests/static_headers.rs @@ -0,0 +1,70 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::{AgentClient, LanguageMetadata, TraceFormat, TraceSendOptions}; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn language_metadata_headers_injected_on_all_requests() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Meta-Lang", "python") + .header("Datadog-Meta-Lang-Version", "3.12.1") + .header("Datadog-Meta-Lang-Interpreter", "CPython") + .header("Datadog-Meta-Tracer-Version", "2.18.0") + .header("User-Agent", "dd-trace-python/2.18.0"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_token_injected_when_set() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("x-datadog-test-session-token", "my-token"); + then.status(200).body(r#"{}"#); + }); + + common::ensure_crypto_provider(); + let client = AgentClient::builder() + .http("localhost", server.port()) + .language_metadata(LanguageMetadata::new( + "python", "3.12.1", "CPython", "2.18.0", + )) + .test_agent_session_token("my-token") + .build() + .unwrap(); + + client + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/stats.rs b/libdd-agent-client/tests/stats.rs new file mode 100644 index 0000000000..4f6ddd5061 --- /dev/null +++ b/libdd-agent-client/tests/stats.rs @@ -0,0 +1,43 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn puts_to_v06_stats() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.6/stats"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn sets_msgpack_content_type() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.6/stats") + .header("Content-Type", "application/msgpack"); + then.status(200).body(""); + }); + + common::client_for(&server) + .send_stats(Bytes::from_static(b"\x80")) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/telemetry.rs b/libdd-agent-client/tests/telemetry.rs new file mode 100644 index 0000000000..e96fae6c86 --- /dev/null +++ b/libdd-agent-client/tests/telemetry.rs @@ -0,0 +1,57 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::TelemetryRequest; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn posts_to_telemetry_proxy() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/telemetry/proxy/api/v2/apmtelemetry"); + then.status(202).body(""); + }); + + common::client_for(&server) + .send_telemetry(TelemetryRequest { + request_type: "app-started".to_string(), + api_version: "v2".to_string(), + debug: false, + body: Bytes::from_static(b"{}"), + }) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn injects_per_request_headers() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/telemetry/proxy/api/v2/apmtelemetry") + .header("DD-Telemetry-Request-Type", "app-started") + .header("DD-Telemetry-API-Version", "v2") + .header("DD-Telemetry-Debug-Enabled", "false"); + then.status(202).body(""); + }); + + common::client_for(&server) + .send_telemetry(TelemetryRequest { + request_type: "app-started".to_string(), + api_version: "v2".to_string(), + debug: false, + body: Bytes::from_static(b"{}"), + }) + .await + .unwrap(); + + mock.assert(); +} diff --git a/libdd-agent-client/tests/traces.rs b/libdd-agent-client/tests/traces.rs new file mode 100644 index 0000000000..c7fb309800 --- /dev/null +++ b/libdd-agent-client/tests/traces.rs @@ -0,0 +1,180 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use bytes::Bytes; +use httpmock::prelude::*; +use libdd_agent_client::{TraceFormat, TraceSendOptions}; + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn v5_puts_to_correct_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(200).body(r#"{"rate_by_service":{}}"#); + }); + + let resp = common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 1, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); + assert_eq!(resp.status, 200); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn v4_puts_to_v4_endpoint() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT).path("/v0.4/traces"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 1, + TraceFormat::MsgpackV4, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn injects_trace_count_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("X-Datadog-Trace-Count", "42"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b"\x91\x90"), + 42, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn injects_send_real_http_status_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Send-Real-Http-Status", "true"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn computed_top_level_injects_header() { + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(PUT) + .path("/v0.5/traces") + .header("Datadog-Client-Computed-Top-Level", "yes"); + then.status(200).body(r#"{}"#); + }); + + common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions { + computed_top_level: true, + }, + ) + .await + .unwrap(); + + mock.assert(); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn parses_rate_by_service() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(200) + .body(r#"{"rate_by_service":{"service:env":0.75}}"#); + }); + + let resp = common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap(); + + assert_eq!( + resp.rate_by_service + .as_ref() + .and_then(|m| m.get("service:env")), + Some(&0.75) + ); +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn returns_http_error_on_5xx() { + let server = MockServer::start(); + server.mock(|when, then| { + when.method(PUT).path("/v0.5/traces"); + then.status(503).body("overloaded"); + }); + + let err = common::client_for(&server) + .send_traces( + Bytes::from_static(b""), + 0, + TraceFormat::MsgpackV5, + TraceSendOptions::default(), + ) + .await + .unwrap_err(); + + assert!(matches!( + err, + libdd_agent_client::SendError::HttpError { status: 503, .. } + )); +} diff --git a/libdd-http-client/src/backend/hyper_backend.rs b/libdd-http-client/src/backend/hyper_backend.rs index d9aa203aa1..7944737332 100644 --- a/libdd-http-client/src/backend/hyper_backend.rs +++ b/libdd-http-client/src/backend/hyper_backend.rs @@ -147,9 +147,18 @@ impl super::Backend for HyperBackend { fn new( _timeout: std::time::Duration, transport: TransportConfig, + allow_connection_pooling: bool, ) -> Result { - let client = http_common::client_builder().build(Connector::default()); - Ok(Self { client, transport }) + let mut builder = http_common::client_builder(); + + if !allow_connection_pooling { + builder.pool_max_idle_per_host(0); + } + + Ok(Self { + client: builder.build(Connector::default()), + transport, + }) } async fn send( diff --git a/libdd-http-client/src/backend/mod.rs b/libdd-http-client/src/backend/mod.rs index eae180acd4..7db33f8ab4 100644 --- a/libdd-http-client/src/backend/mod.rs +++ b/libdd-http-client/src/backend/mod.rs @@ -16,7 +16,11 @@ pub(crate) mod reqwest_backend; /// Backend`. pub(crate) trait Backend: Sized { /// Construct a new backend with the given timeout and transport. - fn new(timeout: Duration, transport: config::TransportConfig) -> Result; + fn new( + timeout: Duration, + transport: config::TransportConfig, + allow_connection_pooling: bool, + ) -> Result; /// Send an HTTP request and return the response. async fn send( diff --git a/libdd-http-client/src/backend/reqwest_backend.rs b/libdd-http-client/src/backend/reqwest_backend.rs index cb99869fb3..e259252d69 100644 --- a/libdd-http-client/src/backend/reqwest_backend.rs +++ b/libdd-http-client/src/backend/reqwest_backend.rs @@ -20,6 +20,7 @@ impl super::Backend for ReqwestBackend { fn new( timeout: std::time::Duration, transport: TransportConfig, + allow_connection_pooling: bool, ) -> Result { let mut builder = reqwest::Client::builder().timeout(timeout); @@ -35,6 +36,10 @@ impl super::Backend for ReqwestBackend { } } + if !allow_connection_pooling { + builder = builder.pool_max_idle_per_host(0); + } + let client = builder .build() .map_err(|e| HttpClientError::InvalidConfig(e.to_string()))?; diff --git a/libdd-http-client/src/client.rs b/libdd-http-client/src/client.rs index 3ca22dae3c..45a2b06300 100644 --- a/libdd-http-client/src/client.rs +++ b/libdd-http-client/src/client.rs @@ -46,7 +46,11 @@ impl HttpClient { config: HttpClientConfig, transport: TransportConfig, ) -> Result { - let backend = BackendImpl::new(config.timeout(), transport)?; + let backend = BackendImpl::new( + config.timeout(), + transport, + config.allow_connection_pooling(), + )?; Ok(Self { backend, config }) } diff --git a/libdd-http-client/src/config.rs b/libdd-http-client/src/config.rs index c78bf20717..9b89b2d417 100644 --- a/libdd-http-client/src/config.rs +++ b/libdd-http-client/src/config.rs @@ -33,6 +33,7 @@ pub struct HttpClientConfig { timeout: Duration, treat_http_errors_as_errors: bool, retry: Option, + allow_connection_pooling: bool, } impl HttpClientConfig { @@ -44,6 +45,7 @@ impl HttpClientConfig { timeout, treat_http_errors_as_errors: true, retry: None, + allow_connection_pooling: true, } } @@ -66,28 +68,45 @@ impl HttpClientConfig { pub fn retry(&self) -> Option<&RetryConfig> { self.retry.as_ref() } + + /// Whether connection pooling can be used, when available. See + /// [HttpClientBuilder::allow_connection_pooling]. + pub fn allow_connection_pooling(&self) -> bool { + self.allow_connection_pooling + } } /// Builder for [`crate::HttpClient`]. /// /// Obtain via [`crate::HttpClient::builder`]. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct HttpClientBuilder { base_url: Option, timeout: Option, treat_http_errors_as_errors: bool, retry: Option, transport: TransportConfig, + allow_connection_pooling: bool, } -impl HttpClientBuilder { - /// Create a new builder with default settings. - pub fn new() -> Self { +impl Default for HttpClientBuilder { + fn default() -> Self { Self { + base_url: Default::default(), + timeout: Default::default(), treat_http_errors_as_errors: true, - ..Default::default() + retry: Default::default(), + transport: Default::default(), + allow_connection_pooling: true, } } +} + +impl HttpClientBuilder { + /// Create a new builder with default settings. + pub fn new() -> Self { + Self::default() + } /// Set the base URL. pub fn base_url(mut self, url: String) -> Self { @@ -136,6 +155,20 @@ impl HttpClientBuilder { self } + /// Allow connection pooling. Defaults to `true`. + /// + /// Note that whether pooling is actually used depends on the HTTP backend of + /// [libdd_http_client], though both currently available backends (reqwest and hyper) support + /// pooling. This setting should be understood as: if set to `true`, the default behavior of the + /// underlying backend will be selected, which might or might not do connection pooling by + /// default. If set to `false`, we guarantee no connection pooling will happen. + /// + /// This setting is used by the Agent-level HTTP client. + pub fn allow_connection_pooling(mut self, allow: bool) -> Self { + self.allow_connection_pooling = allow; + self + } + /// Build the [`crate::HttpClient`]. /// /// Returns [`crate::HttpClientError::InvalidConfig`] if required fields @@ -152,6 +185,7 @@ impl HttpClientBuilder { timeout, treat_http_errors_as_errors: self.treat_http_errors_as_errors, retry: self.retry, + allow_connection_pooling: self.allow_connection_pooling, }; crate::HttpClient::from_config_and_transport(config, self.transport) } diff --git a/libdd-http-client/src/request.rs b/libdd-http-client/src/request.rs index 5b27cc2f1d..888eaf9c4e 100644 --- a/libdd-http-client/src/request.rs +++ b/libdd-http-client/src/request.rs @@ -134,6 +134,18 @@ impl HttpRequest { self } + /// Append headers to this request. + #[inline] + pub fn with_headers(mut self, it: impl IntoIterator) -> Self + where + K: Into, + V: Into, + { + self.headers + .extend(it.into_iter().map(|(k, v)| (k.into(), v.into()))); + self + } + /// Set the request body. #[inline] pub fn with_body(mut self, body: impl Into) -> Self { diff --git a/tools/docker/Dockerfile.build b/tools/docker/Dockerfile.build index ec81632518..caae0f5a89 100644 --- a/tools/docker/Dockerfile.build +++ b/tools/docker/Dockerfile.build @@ -87,6 +87,7 @@ COPY "libdd-log/Cargo.toml" "libdd-log/" COPY "libdd-log-ffi/Cargo.toml" "libdd-log-ffi/" COPY "libdd-dogstatsd-client/Cargo.toml" "libdd-dogstatsd-client/" COPY "libdd-http-client/Cargo.toml" "libdd-http-client/" +COPY "libdd-agent-client/Cargo.toml" "libdd-agent-client/" COPY "libdd-library-config-ffi/Cargo.toml" "libdd-library-config-ffi/" COPY "libdd-library-config/Cargo.toml" "libdd-library-config/" COPY "datadog-live-debugger/Cargo.toml" "datadog-live-debugger/"