diff --git a/Cargo.lock b/Cargo.lock index dc618d7e..fedb9936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,6 +543,7 @@ dependencies = [ "protobuf", "regex", "reqwest", + "rustls-pemfile", "serde", "serde_json", "thiserror 1.0.69", diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 5b91a911..04e995a3 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -224,6 +224,8 @@ async fn start_dogstatsd( timeout: DOGSTATSD_TIMEOUT_DURATION, retry_strategy: RetryStrategy::LinearBackoff(3, 1), compression_level: CompressionLevel::try_from(6).unwrap_or_default(), + // Not supported yet + ca_cert_path: None, }); Some(metrics_flusher) } diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index 14f7255e..2b367108 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -25,6 +25,7 @@ tracing = { version = "0.1.40", default-features = false } regex = { version = "1.10.6", default-features = false } zstd = { version = "0.13.3", default-features = false } datadog-fips = { path = "../datadog-fips", default-features = false } +rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] } [dev-dependencies] mockito = { version = "1.5.0", default-features = false } diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index f59d1915..39b12baa 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -14,7 +14,8 @@ use reqwest::{Client, Response}; use serde::{Serialize, Serializer}; use serde_json; use std::error::Error; -use std::io::Write; +use std::fs::File; +use std::io::{BufReader, Write}; use std::sync::OnceLock; use std::time::Duration; use tracing::{debug, error}; @@ -148,11 +149,12 @@ impl DdApi { api_key: String, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, https_proxy: Option, + ca_cert_path: Option, timeout: Duration, retry_strategy: RetryStrategy, compression_level: CompressionLevel, ) -> Self { - let client = build_client(https_proxy, timeout) + let client = build_client(https_proxy, ca_cert_path, timeout) .inspect_err(|e| { error!("Unable to create client {:?}", e); }) @@ -290,14 +292,59 @@ pub enum RetryStrategy { LinearBackoff(u64, u64), // attempts, delay } -fn build_client(https_proxy: Option, timeout: Duration) -> Result> { +fn build_client( + https_proxy: Option, + ca_cert_path: Option, + timeout: Duration, +) -> Result> { let mut builder = create_reqwest_client_builder()?.timeout(timeout); + + // Load custom TLS certificate if configured + if let Some(cert_path) = &ca_cert_path { + match load_custom_cert(cert_path) { + Ok(certs) => { + let cert_count = certs.len(); + for cert in certs { + builder = builder.add_root_certificate(cert); + } + debug!( + "DOGSTATSD | Added {} root certificate(s) from {}", + cert_count, cert_path + ); + } + Err(e) => { + error!( + "DOGSTATSD | Failed to load TLS certificate from {}: {}, continuing without custom cert", + cert_path, e + ); + } + } + } + if let Some(proxy) = https_proxy { builder = builder.proxy(reqwest::Proxy::https(proxy)?); } Ok(builder.build()?) } +fn load_custom_cert(cert_path: &str) -> Result, Box> { + let file = File::open(cert_path)?; + let mut reader = BufReader::new(file); + + // Parse PEM certificates + let certs = rustls_pemfile::certs(&mut reader).collect::, _>>()?; + + if certs.is_empty() { + return Err("No certificates found in file".into()); + } + + // Convert all certificates found in the file + certs + .into_iter() + .map(|cert| reqwest::Certificate::from_der(cert.as_ref()).map_err(Into::into)) + .collect() +} + #[derive(Debug, Serialize, Clone, Copy)] /// A single point in time pub(crate) struct Point { diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index ef2d2192..0de79c1c 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -17,6 +17,7 @@ pub struct Flusher { api_key_factory: Arc, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, https_proxy: Option, + ca_cert_path: Option, timeout: Duration, retry_strategy: RetryStrategy, aggregator_handle: AggregatorHandle, @@ -29,6 +30,7 @@ pub struct FlusherConfig { pub aggregator_handle: AggregatorHandle, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, + pub ca_cert_path: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, pub compression_level: CompressionLevel, @@ -40,6 +42,7 @@ impl Flusher { api_key_factory: Arc::clone(&config.api_key_factory), metrics_intake_url_prefix: config.metrics_intake_url_prefix, https_proxy: config.https_proxy, + ca_cert_path: config.ca_cert_path, timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator_handle: config.aggregator_handle, @@ -57,6 +60,7 @@ impl Flusher { api_key.to_string(), self.metrics_intake_url_prefix.clone(), self.https_proxy.clone(), + self.ca_cert_path.clone(), self.timeout, self.retry_strategy.clone(), self.compression_level, @@ -283,6 +287,7 @@ mod tests { ) .expect("failed to create URL"), https_proxy: None, + ca_cert_path: None, timeout: Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(1), compression_level: CompressionLevel::try_from(6) @@ -329,6 +334,7 @@ mod tests { ) .expect("failed to create URL"), https_proxy: None, + ca_cert_path: None, timeout: Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(1), compression_level: CompressionLevel::try_from(6) @@ -378,6 +384,7 @@ mod tests { ) .expect("failed to create URL"), https_proxy: None, + ca_cert_path: None, timeout: Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(1), compression_level: CompressionLevel::try_from(6) diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index f9f2b909..65d919c1 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -59,6 +59,7 @@ async fn dogstatsd_server_ships_series() { ) .expect("failed to create URL"), https_proxy: None, + ca_cert_path: None, timeout: std::time::Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(3), compression_level: CompressionLevel::try_from(6) @@ -139,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() { ) .expect("failed to create URL"), None, + None, Duration::from_secs(1), retry_strategy.clone(), 6, @@ -195,6 +197,7 @@ async fn test_send_with_retry_linear_backoff_success() { ) .expect("failed to create URL"), None, + None, Duration::from_secs(1), retry_strategy.clone(), 6, @@ -250,6 +253,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { ) .expect("failed to create URL"), None, + None, Duration::from_secs(1), retry_strategy.clone(), 6,