Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub async fn main() {
let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD")
.map(|val| val.to_lowercase() != "false")
.unwrap_or(true);
let dd_statsd_metric_namespace: Option<String> = env::var("DD_STATSD_METRIC_NAMESPACE")
.ok()
.and_then(|val| validate_metric_namespace(&val));

let https_proxy = env::var("DD_PROXY_HTTPS")
.or_else(|_| env::var("HTTPS_PROXY"))
Expand Down Expand Up @@ -144,6 +147,7 @@ pub async fn main() {
dd_site,
https_proxy,
dogstatsd_tags,
dd_statsd_metric_namespace,
)
.await;
info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}");
Expand Down Expand Up @@ -172,6 +176,7 @@ async fn start_dogstatsd(
dd_site: String,
https_proxy: Option<String>,
dogstatsd_tags: &str,
metric_namespace: Option<String>,
) -> (CancellationToken, Option<Flusher>, AggregatorHandle) {
// 1. Create the aggregator service
#[allow(clippy::expect_used)]
Expand All @@ -187,6 +192,7 @@ async fn start_dogstatsd(
let dogstatsd_config = DogStatsDConfig {
host: AGENT_HOST.to_string(),
port,
metric_namespace,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();

Expand Down Expand Up @@ -228,3 +234,104 @@ async fn start_dogstatsd(

(dogstatsd_cancel_token, metrics_flusher, handle)
}

fn validate_metric_namespace(namespace: &str) -> Option<String> {
let trimmed = namespace.trim();
if trimmed.is_empty() {
return None;
}

let mut chars = trimmed.chars();

// Check first character is a letter
if let Some(first_char) = chars.next() {
if !first_char.is_ascii_alphabetic() {
error!(
"DD_STATSD_METRIC_NAMESPACE must start with a letter, got: '{}'. Ignoring namespace.",
trimmed
);
return None;
}
} else {
return None;
}

// Check remaining characters are valid (alphanumeric, underscore, or period)
if let Some(invalid_char) =
chars.find(|&ch| !ch.is_ascii_alphanumeric() && ch != '_' && ch != '.')
{
error!(
"DD_STATSD_METRIC_NAMESPACE contains invalid character '{}' in '{}'. Only ASCII alphanumerics, underscores, and periods are allowed. Ignoring namespace.",
invalid_char, trimmed
);
return None;
}

Some(trimmed.to_string())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_validate_metric_namespace_valid() {
assert_eq!(
validate_metric_namespace("myapp"),
Some("myapp".to_string())
);
assert_eq!(
validate_metric_namespace("my_app"),
Some("my_app".to_string())
);
assert_eq!(
validate_metric_namespace("my.app"),
Some("my.app".to_string())
);
assert_eq!(
validate_metric_namespace("myApp123"),
Some("myApp123".to_string())
);
assert_eq!(
validate_metric_namespace("a1.b2_c3"),
Some("a1.b2_c3".to_string())
);
}

#[test]
fn test_validate_metric_namespace_with_whitespace() {
assert_eq!(
validate_metric_namespace(" myapp "),
Some("myapp".to_string())
);
assert_eq!(
validate_metric_namespace("\tmyapp\n"),
Some("myapp".to_string())
);
}

#[test]
fn test_validate_metric_namespace_empty() {
assert_eq!(validate_metric_namespace(""), None);
assert_eq!(validate_metric_namespace(" "), None);
assert_eq!(validate_metric_namespace("\t\n"), None);
}

#[test]
fn test_validate_metric_namespace_invalid_start() {
assert_eq!(validate_metric_namespace("1myapp"), None);
assert_eq!(validate_metric_namespace("_myapp"), None);
assert_eq!(validate_metric_namespace(".myapp"), None);
assert_eq!(validate_metric_namespace("-myapp"), None);
}

#[test]
fn test_validate_metric_namespace_invalid_characters() {
assert_eq!(validate_metric_namespace("my-app"), None);
assert_eq!(validate_metric_namespace("my app"), None);
assert_eq!(validate_metric_namespace("my@app"), None);
assert_eq!(validate_metric_namespace("my#app"), None);
assert_eq!(validate_metric_namespace("my$app"), None);
assert_eq!(validate_metric_namespace("my!app"), None);
}
}
44 changes: 38 additions & 6 deletions crates/dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ use std::str::Split;

use crate::aggregator_service::AggregatorHandle;
use crate::errors::ParseError::UnsupportedType;
use crate::metric::{parse, Metric};
use crate::metric::{id, parse, Metric};
use tracing::{debug, error};

pub struct DogStatsD {
cancel_token: tokio_util::sync::CancellationToken,
aggregator_handle: AggregatorHandle,
buffer_reader: BufferReader,
metric_namespace: Option<String>,
}

pub struct DogStatsDConfig {
pub host: String,
pub port: u16,
pub metric_namespace: Option<String>,
}

enum BufferReader {
Expand Down Expand Up @@ -65,6 +67,7 @@ impl DogStatsD {
cancel_token,
aggregator_handle,
buffer_reader: BufferReader::UdpSocketReader(socket),
metric_namespace: config.metric_namespace.clone(),
}
}

Expand All @@ -91,7 +94,14 @@ impl DogStatsD {
self.insert_metrics(statsd_metric_strings);
}

fn prepend_namespace(namespace: &str, metric: &mut Metric) {
let new_name = format!("{}.{}", namespace, metric.name);
metric.name = ustr::Ustr::from(&new_name);
metric.id = id(metric.name, &metric.tags, metric.timestamp);
}

fn insert_metrics(&self, msg: Split<char>) {
let namespace = self.metric_namespace.as_deref();
let all_valid_metrics: Vec<Metric> = msg
.filter(|m| {
!m.is_empty()
Expand All @@ -116,6 +126,12 @@ impl DogStatsD {
None
}
})
.map(|mut metric| {
if let Some(ns) = namespace {
Self::prepend_namespace(ns, &mut metric);
}
metric
})
.collect();
if !all_valid_metrics.is_empty() {
// Send metrics through the channel - no lock needed!
Expand All @@ -141,7 +157,7 @@ mod tests {
"single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409
single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409
single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409
",
", None
)
.await;

Expand All @@ -163,9 +179,10 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
let response = setup_and_consume_dogstatsd(
format!(
"metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n",
now
now,
)
.as_str(),
None,
)
.await;

Expand All @@ -176,7 +193,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d

#[tokio::test]
async fn test_dogstatsd_single_metric() {
let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409").await;
let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409", None).await;

assert_eq!(response.series.len(), 1);
assert_eq!(response.series[0].series.len(), 1);
Expand All @@ -186,7 +203,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
#[tokio::test]
#[traced_test]
async fn test_dogstatsd_filter_service_check() {
let response = setup_and_consume_dogstatsd("_sc|servicecheck|0").await;
let response = setup_and_consume_dogstatsd("_sc|servicecheck|0", None).await;

assert!(!logs_contain("Failed to parse metric"));
assert_eq!(response.series.len(), 0);
Expand All @@ -196,15 +213,29 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
#[tokio::test]
#[traced_test]
async fn test_dogstatsd_filter_event() {
let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event").await;
let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event", None).await;

assert!(!logs_contain("Failed to parse metric"));
assert_eq!(response.series.len(), 0);
assert_eq!(response.distributions.len(), 0);
}

#[tokio::test]
async fn test_dogstatsd_with_namespace() {
let response =
setup_and_consume_dogstatsd("my.metric:42|c", Some("custom.namespace".to_string()))
.await;

assert_eq!(response.series.len(), 1);
assert_eq!(response.series[0].series.len(), 1);
assert!(response.series[0].series[0]
.metric
.starts_with("custom.namespace.my.metric"));
}

async fn setup_and_consume_dogstatsd(
statsd_string: &str,
metric_namespace: Option<String>,
) -> crate::aggregator_service::FlushResponse {
// Create the aggregator service
let (service, handle) =
Expand All @@ -222,6 +253,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
statsd_string.as_bytes().to_vec(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0),
),
metric_namespace,
};
dogstatsd.consume_statsd().await;

Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok
let dogstatsd_config = DogStatsDConfig {
host: "127.0.0.1".to_string(),
port: 18125,
metric_namespace: None,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
let dogstatsd_client = DogStatsD::new(
Expand Down
Loading