diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 7f4681e9..e62305b8 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -72,6 +72,9 @@ pub async fn main() { let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD") .map(|val| val.to_lowercase() != "false") .unwrap_or(true); + let dd_statsd_metric_namespace: Option = env::var("DD_STATSD_METRIC_NAMESPACE") + .ok() + .and_then(|val| validate_metric_namespace(&val)); let https_proxy = env::var("DD_PROXY_HTTPS") .or_else(|_| env::var("HTTPS_PROXY")) @@ -144,6 +147,7 @@ pub async fn main() { dd_site, https_proxy, dogstatsd_tags, + dd_statsd_metric_namespace, ) .await; info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); @@ -172,6 +176,7 @@ async fn start_dogstatsd( dd_site: String, https_proxy: Option, dogstatsd_tags: &str, + metric_namespace: Option, ) -> (CancellationToken, Option, AggregatorHandle) { // 1. Create the aggregator service #[allow(clippy::expect_used)] @@ -187,6 +192,7 @@ async fn start_dogstatsd( let dogstatsd_config = DogStatsDConfig { host: AGENT_HOST.to_string(), port, + metric_namespace, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); @@ -228,3 +234,104 @@ async fn start_dogstatsd( (dogstatsd_cancel_token, metrics_flusher, handle) } + +fn validate_metric_namespace(namespace: &str) -> Option { + let trimmed = namespace.trim(); + if trimmed.is_empty() { + return None; + } + + let mut chars = trimmed.chars(); + + // Check first character is a letter + if let Some(first_char) = chars.next() { + if !first_char.is_ascii_alphabetic() { + error!( + "DD_STATSD_METRIC_NAMESPACE must start with a letter, got: '{}'. Ignoring namespace.", + trimmed + ); + return None; + } + } else { + return None; + } + + // Check remaining characters are valid (alphanumeric, underscore, or period) + if let Some(invalid_char) = + chars.find(|&ch| !ch.is_ascii_alphanumeric() && ch != '_' && ch != '.') + { + error!( + "DD_STATSD_METRIC_NAMESPACE contains invalid character '{}' in '{}'. Only ASCII alphanumerics, underscores, and periods are allowed. Ignoring namespace.", + invalid_char, trimmed + ); + return None; + } + + Some(trimmed.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_validate_metric_namespace_valid() { + assert_eq!( + validate_metric_namespace("myapp"), + Some("myapp".to_string()) + ); + assert_eq!( + validate_metric_namespace("my_app"), + Some("my_app".to_string()) + ); + assert_eq!( + validate_metric_namespace("my.app"), + Some("my.app".to_string()) + ); + assert_eq!( + validate_metric_namespace("myApp123"), + Some("myApp123".to_string()) + ); + assert_eq!( + validate_metric_namespace("a1.b2_c3"), + Some("a1.b2_c3".to_string()) + ); + } + + #[test] + fn test_validate_metric_namespace_with_whitespace() { + assert_eq!( + validate_metric_namespace(" myapp "), + Some("myapp".to_string()) + ); + assert_eq!( + validate_metric_namespace("\tmyapp\n"), + Some("myapp".to_string()) + ); + } + + #[test] + fn test_validate_metric_namespace_empty() { + assert_eq!(validate_metric_namespace(""), None); + assert_eq!(validate_metric_namespace(" "), None); + assert_eq!(validate_metric_namespace("\t\n"), None); + } + + #[test] + fn test_validate_metric_namespace_invalid_start() { + assert_eq!(validate_metric_namespace("1myapp"), None); + assert_eq!(validate_metric_namespace("_myapp"), None); + assert_eq!(validate_metric_namespace(".myapp"), None); + assert_eq!(validate_metric_namespace("-myapp"), None); + } + + #[test] + fn test_validate_metric_namespace_invalid_characters() { + assert_eq!(validate_metric_namespace("my-app"), None); + assert_eq!(validate_metric_namespace("my app"), None); + assert_eq!(validate_metric_namespace("my@app"), None); + assert_eq!(validate_metric_namespace("my#app"), None); + assert_eq!(validate_metric_namespace("my$app"), None); + assert_eq!(validate_metric_namespace("my!app"), None); + } +} diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index f16ff0b1..7db0288a 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -6,18 +6,20 @@ use std::str::Split; use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; -use crate::metric::{parse, Metric}; +use crate::metric::{id, parse, Metric}; use tracing::{debug, error}; pub struct DogStatsD { cancel_token: tokio_util::sync::CancellationToken, aggregator_handle: AggregatorHandle, buffer_reader: BufferReader, + metric_namespace: Option, } pub struct DogStatsDConfig { pub host: String, pub port: u16, + pub metric_namespace: Option, } enum BufferReader { @@ -65,6 +67,7 @@ impl DogStatsD { cancel_token, aggregator_handle, buffer_reader: BufferReader::UdpSocketReader(socket), + metric_namespace: config.metric_namespace.clone(), } } @@ -91,7 +94,14 @@ impl DogStatsD { self.insert_metrics(statsd_metric_strings); } + fn prepend_namespace(namespace: &str, metric: &mut Metric) { + let new_name = format!("{}.{}", namespace, metric.name); + metric.name = ustr::Ustr::from(&new_name); + metric.id = id(metric.name, &metric.tags, metric.timestamp); + } + fn insert_metrics(&self, msg: Split) { + let namespace = self.metric_namespace.as_deref(); let all_valid_metrics: Vec = msg .filter(|m| { !m.is_empty() @@ -116,6 +126,12 @@ impl DogStatsD { None } }) + .map(|mut metric| { + if let Some(ns) = namespace { + Self::prepend_namespace(ns, &mut metric); + } + metric + }) .collect(); if !all_valid_metrics.is_empty() { // Send metrics through the channel - no lock needed! @@ -141,7 +157,7 @@ mod tests { "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 -", +", None ) .await; @@ -163,9 +179,10 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d let response = setup_and_consume_dogstatsd( format!( "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", - now + now, ) .as_str(), + None, ) .await; @@ -176,7 +193,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d #[tokio::test] async fn test_dogstatsd_single_metric() { - let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409").await; + let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409", None).await; assert_eq!(response.series.len(), 1); assert_eq!(response.series[0].series.len(), 1); @@ -186,7 +203,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_service_check() { - let response = setup_and_consume_dogstatsd("_sc|servicecheck|0").await; + let response = setup_and_consume_dogstatsd("_sc|servicecheck|0", None).await; assert!(!logs_contain("Failed to parse metric")); assert_eq!(response.series.len(), 0); @@ -196,15 +213,29 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_event() { - let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event").await; + let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event", None).await; assert!(!logs_contain("Failed to parse metric")); assert_eq!(response.series.len(), 0); assert_eq!(response.distributions.len(), 0); } + #[tokio::test] + async fn test_dogstatsd_with_namespace() { + let response = + setup_and_consume_dogstatsd("my.metric:42|c", Some("custom.namespace".to_string())) + .await; + + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 1); + assert!(response.series[0].series[0] + .metric + .starts_with("custom.namespace.my.metric")); + } + async fn setup_and_consume_dogstatsd( statsd_string: &str, + metric_namespace: Option, ) -> crate::aggregator_service::FlushResponse { // Create the aggregator service let (service, handle) = @@ -222,6 +253,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d statsd_string.as_bytes().to_vec(), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), ), + metric_namespace, }; dogstatsd.consume_statsd().await; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 4be84bf8..f9f2b909 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -95,6 +95,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok let dogstatsd_config = DogStatsDConfig { host: "127.0.0.1".to_string(), port: 18125, + metric_namespace: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new(