diff --git a/crates/dogstatsd/src/metric.rs b/crates/dogstatsd/src/metric.rs index a94715bf..72fe630e 100644 --- a/crates/dogstatsd/src/metric.rs +++ b/crates/dogstatsd/src/metric.rs @@ -6,23 +6,21 @@ use crate::{constants, datadog}; use ddsketch_agent::DDSketch; use fnv::FnvHasher; use protobuf::Chars; -use regex::Regex; use std::hash::{Hash, Hasher}; -use std::sync::OnceLock; use ustr::Ustr; pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#dogstatsd-protocol-v13 -static METRIC_REGEX: OnceLock = OnceLock::new(); -fn get_metric_regex() -> &'static Regex { - #[allow(clippy::expect_used)] - METRIC_REGEX.get_or_init(|| { - Regex::new( - r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?(?:\|T(?P[^|]+))?$", - ) - .expect("Failed to create metric regex") - }) + +/// Split input by the next pipe delimiter, returning the field and remaining input. +/// Returns (field, Some(remaining)) if pipe found, or (field, None) if no pipe. +fn next_field(input: &str) -> (&str, Option<&str>) { + if let Some(idx) = input.find('|') { + (&input[..idx], Some(&input[idx + 1..])) + } else { + (input, None) + } } #[derive(Clone, Debug)] @@ -248,66 +246,112 @@ pub fn timestamp_to_bucket(timestamp: i64) -> i64 { /// limits in [`constants`]. Any non-viable input will be discarded. /// example aj-test.increment:1|c|#user:aj-test from 127.0.0.1:50983 pub fn parse(input: &str) -> Result { - // TODO must enforce / exploit constraints given in `constants`. - if let Some(caps) = get_metric_regex().captures(input) { - // unused for now - // let sample_rate = caps.name("sample_rate").map(|m| m.as_str()); - - let tags; - if let Some(tags_section) = caps.name("tags") { - tags = Some(SortedTags::parse(tags_section.as_str())?); - } else { - tags = None; + // Parse mandatory fields: :| + let (name_and_value, remaining) = next_field(input); + + // Split name and value by ':' + let (name, values) = name_and_value + .split_once(':') + .ok_or_else(|| ParseError::Raw(format!("Invalid metric format {input}")))?; + + if name.is_empty() || values.is_empty() { + return Err(ParseError::Raw(format!("Invalid metric format {input}"))); + } + + // Extract type (next field is mandatory) + let remaining = + remaining.ok_or_else(|| ParseError::Raw(format!("Invalid metric format {input}")))?; + let (metric_type, remaining) = next_field(remaining); + + if metric_type.is_empty() { + return Err(ParseError::Raw(format!("Invalid metric format {input}"))); + } + + // Parse the first value + let val = first_value(values)?; + + // Parse metric type and create value + // Open question: do we allow dup tags? + let metric_value = match metric_type { + "c" => MetricValue::Count(val), + "g" => MetricValue::Gauge(val), + "d" => { + let sketch = &mut DDSketch::default(); + sketch.insert(val); + MetricValue::Distribution(sketch.to_owned()) + } + "h" | "s" | "ms" => { + return Err(ParseError::UnsupportedType(metric_type.to_string())); } + _ => { + return Err(ParseError::Raw(format!( + "Invalid metric type: {metric_type}" + ))); + } + }; - #[allow(clippy::unwrap_used)] - let val = first_value(caps.name("values").unwrap().as_str())?; + // Get current time once for use as fallback + #[allow(clippy::expect_used)] + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); - #[allow(clippy::unwrap_used)] - let t = caps.name("type").unwrap().as_str(); + // Parse optional fields in any order + let mut tags: Option = None; + let mut timestamp: Option = None; + // sample_rate and container_id are parsed but currently unused + let mut _sample_rate: Option = None; + let mut _container_id: Option<&str> = None; - #[allow(clippy::expect_used)] - let now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - // let Metric::new() handle bucketing the timestamp - let parsed_timestamp: i64 = match caps.name("timestamp") { - Some(ts) => { - let sec = ts.as_str().parse::().unwrap_or(now as f64).round() as i64; - timestamp_to_bucket(sec) - } - None => timestamp_to_bucket(now), - }; - let metric_value = match t { - "c" => MetricValue::Count(val), - "g" => MetricValue::Gauge(val), - "d" => { - let sketch = &mut DDSketch::default(); - sketch.insert(val); - MetricValue::Distribution(sketch.to_owned()) - } - "h" | "s" | "ms" => { - return Err(ParseError::UnsupportedType(t.to_string())); - } - _ => { - return Err(ParseError::Raw(format!("Invalid metric type: {t}"))); + let mut current = remaining; + while let Some(field_remaining) = current { + let (field, next) = next_field(field_remaining); + + if field.is_empty() { + current = next; + continue; + } + + // Match by prefix for position-flexible parsing + if let Some(tags_str) = field.strip_prefix('#') { + tags = Some(SortedTags::parse(tags_str)?); + } else if let Some(ts_str) = field.strip_prefix('T') { + if ts_str.is_empty() { + return Err(ParseError::Raw("Empty timestamp field: T".to_string())); } - }; - #[allow(clippy::unwrap_used)] - let name = Ustr::from(caps.name("name").unwrap().as_str()); - let id = id(name, &tags, parsed_timestamp); - return Ok(Metric { - name, - value: metric_value, - tags, - id, - timestamp: parsed_timestamp, - }); + let sec = ts_str + .parse::() + .map_err(|_| ParseError::Raw(format!("Invalid timestamp: T{}", ts_str)))? + .round() as i64; + timestamp = Some(sec); + } else if let Some(sr_str) = field.strip_prefix('@') { + _sample_rate = Some( + sr_str + .parse::() + .map_err(|_| ParseError::Raw(format!("Invalid sample rate: @{}", sr_str)))?, + ); + } else if let Some(cid_str) = field.strip_prefix("c:") { + _container_id = Some(cid_str); + } + // Ignore unknown fields for forward compatibility + + current = next; } - Err(ParseError::Raw(format!("Invalid metric format {input}"))) + + let parsed_timestamp = timestamp_to_bucket(timestamp.unwrap_or(now)); + let name_ustr = Ustr::from(name); + let metric_id = id(name_ustr, &tags, parsed_timestamp); + + Ok(Metric { + name: name_ustr, + value: metric_value, + tags, + id: metric_id, + timestamp: parsed_timestamp, + }) } fn first_value(values: &str) -> Result { @@ -670,4 +714,412 @@ mod tests { assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]); assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new()); } + + #[test] + fn parse_timestamp_before_tags() { + // Test that timestamp can come before tags (position-flexible) + let input = "ab.myVisitor:1|c|T1656581409|#env:dev,service:test"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "ab.myVisitor"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 1.0); + } else { + panic!("Expected count metric"); + } + assert_eq!(metric.timestamp, 1656581400); // rounded to 10s bucket + assert!(metric.tags.is_some()); + let tags = metric.tags.unwrap(); + assert_eq!(tags.values.len(), 2); + } + + #[test] + fn parse_tags_before_timestamp() { + // Test traditional order still works + let input = "ab.myVisitor:1|c|#env:dev,service:test|T1656581409"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "ab.myVisitor"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 1.0); + } else { + panic!("Expected count metric"); + } + assert_eq!(metric.timestamp, 1656581400); + assert!(metric.tags.is_some()); + let tags = metric.tags.unwrap(); + assert_eq!(tags.values.len(), 2); + } + + #[test] + fn parse_mixed_order_optional_fields() { + // Test all optional fields in various orders + let input = "test.metric:5|g|T1656581409|@0.5|#tag1:val1|c:container123"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, 5.0); + } else { + panic!("Expected gauge metric"); + } + assert_eq!(metric.timestamp, 1656581400); + } + + #[test] + fn parse_sample_rate() { + // Test that sample rate is parsed without error (even though unused) + let input = "test.metric:10|c|@0.5"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 10.0); + } else { + panic!("Expected count metric"); + } + } + + #[test] + fn parse_sample_rate_with_tags() { + // Test sample rate with tags in mixed order + let input = "test.metric:10|c|@0.1|#env:prod"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 10.0); + } else { + panic!("Expected count metric"); + } + assert!(metric.tags.is_some()); + } + + #[test] + fn parse_invalid_sample_rate_letters() { + // Sample rate with letters should be rejected + let input = "test.metric:10|c|@abc"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid sample rate")); + } + _ => panic!("Expected ParseError::Raw for invalid sample rate"), + } + } + + #[test] + fn parse_invalid_sample_rate_empty() { + // Empty sample rate should be rejected + let input = "test.metric:10|c|@"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid sample rate")); + } + _ => panic!("Expected ParseError::Raw for empty sample rate"), + } + } + + #[test] + fn parse_invalid_sample_rate_mixed() { + // Sample rate with mixed letters and numbers should be rejected + let input = "test.metric:10|c|@0.5abc"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid sample rate")); + } + _ => panic!("Expected ParseError::Raw for invalid sample rate"), + } + } + + #[test] + fn parse_unknown_field_ignored() { + // Test that unknown fields are silently ignored (forward compatibility) + let input = "test.metric:5|g|#tag:val|X123|Yunknown"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, 5.0); + } else { + panic!("Expected gauge metric"); + } + assert!(metric.tags.is_some()); + } + + #[test] + fn parse_empty_field_between_pipes() { + // Test that empty fields between pipes are skipped + let input = "test.metric:5|g||#tag:val||"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, 5.0); + } else { + panic!("Expected gauge metric"); + } + assert!(metric.tags.is_some()); + } + + #[test] + fn parse_empty_name() { + // Test that empty metric name is rejected + let input = ":5|g|#tag:val"; + assert!(parse(input).is_err()); + } + + #[test] + fn parse_empty_value() { + // Test that empty metric value is rejected + let input = "test.metric:|g|#tag:val"; + assert!(parse(input).is_err()); + } + + #[test] + fn parse_empty_type() { + // Test that empty metric type is rejected + let input = "test.metric:5||#tag:val"; + assert!(parse(input).is_err()); + } + + #[test] + fn parse_only_mandatory_fields() { + // Test parsing with only mandatory fields (no optional fields) + let input = "test.metric:42|c"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 42.0); + } else { + panic!("Expected count metric"); + } + assert!(metric.tags.is_none()); + } + + #[test] + fn parse_all_optional_fields_present() { + // Test parsing with all optional fields in canonical order + let input = "test.metric:7|g|@0.5|#env:prod,region:us|c:container123|T1656581409"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, 7.0); + } else { + panic!("Expected gauge metric"); + } + assert_eq!(metric.timestamp, 1656581400); + let tags = metric.tags.unwrap(); + assert_eq!(tags.values.len(), 2); + } + + #[test] + fn parse_reverse_order_all_fields() { + // Test all fields in reverse order + let input = "test.metric:7|g|c:container123|T1656581409|#env:prod|@0.5"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, 7.0); + } else { + panic!("Expected gauge metric"); + } + assert_eq!(metric.timestamp, 1656581400); + assert!(metric.tags.is_some()); + } + + #[test] + fn parse_multiple_colons_in_value() { + // Test metric value with multiple colons (first_value extracts first segment) + // Format: name:value1:value2:value3|type + let input = "my.metric:5:10:15|c"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "my.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 5.0); // first_value() takes the first value + } else { + panic!("Expected count metric"); + } + } + + #[test] + fn parse_invalid_timestamp_fallback() { + // Test that invalid timestamp is rejected (not silently accepted) + let input = "test.metric:5|g|Tinvalid"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid timestamp")); + } + _ => panic!("Expected ParseError::Raw for invalid timestamp"), + } + } + + #[test] + fn parse_invalid_timestamp_empty() { + // Empty timestamp should be rejected + let input = "test.metric:10|c|T"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Empty timestamp")); + } + _ => panic!("Expected ParseError::Raw for empty timestamp"), + } + } + + #[test] + fn parse_invalid_timestamp_letters() { + // Timestamp with letters should be rejected + let input = "test.metric:10|c|Tabc"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid timestamp")); + } + _ => panic!("Expected ParseError::Raw for invalid timestamp"), + } + } + + #[test] + fn parse_invalid_timestamp_mixed() { + // Timestamp with mixed letters and numbers should be rejected + let input = "test.metric:10|c|T123abc"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid timestamp")); + } + _ => panic!("Expected ParseError::Raw for invalid timestamp"), + } + } + + #[test] + fn parse_sample_rate_negative() { + // Negative sample rates should be accepted by parser (validation is application-level) + let input = "test.metric:10|c|@-0.5"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 10.0); + } else { + panic!("Expected count metric"); + } + } + + #[test] + fn parse_sample_rate_infinity() { + // Infinity as sample rate is accepted by f64 parser (semantic validation is application-level) + let input = "test.metric:10|c|@inf"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, 10.0); + } else { + panic!("Expected count metric"); + } + } + + #[test] + fn parse_timestamp_negative() { + // Negative timestamps should be accepted (historical data) + let input = "test.metric:10|c|T-100"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + // Timestamp should be bucketed to nearest 10s + let expected = (-100 / 10) * 10; + assert_eq!(metric.timestamp, expected); + } + + #[test] + fn parse_timestamp_zero() { + // Zero timestamp (epoch) should be valid + let input = "test.metric:10|c|T0"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + assert_eq!(metric.timestamp, 0); + } + + #[test] + fn parse_timestamp_infinity() { + // Infinity as timestamp is accepted by f64 parser and rounds to i64::MAX + let input = "test.metric:10|c|Tinf"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + // f64::INFINITY.round() as i64 results in i64::MAX + // Bucketing should still work + assert!(metric.timestamp > 0); + } + + #[test] + fn parse_invalid_sample_rate_with_valid_timestamp() { + // Invalid sample rate should fail even with valid timestamp + let input = "test.metric:10|c|@abc|T1234567890"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid sample rate")); + } + _ => panic!("Expected ParseError::Raw for invalid sample rate"), + } + } + + #[test] + fn parse_valid_sample_rate_with_invalid_timestamp() { + // Invalid timestamp should fail even with valid sample rate + let input = "test.metric:10|c|@0.5|Tabc"; + let result = parse(input); + assert!(result.is_err()); + match result { + Err(ParseError::Raw(msg)) => { + assert!(msg.contains("Invalid timestamp")); + } + _ => panic!("Expected ParseError::Raw for invalid timestamp"), + } + } + + #[test] + fn parse_sample_rate_before_timestamp() { + // Sample rate before timestamp should work (order independent) + let input = "test.metric:10|c|@0.5|T1234567890"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + assert_eq!(metric.timestamp, 1234567890); + } + + #[test] + fn parse_timestamp_before_sample_rate() { + // Timestamp before sample rate should work (order independent) + let input = "test.metric:10|c|T1234567890|@0.5"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + assert_eq!(metric.timestamp, 1234567890); + } + + #[test] + fn parse_sample_rate_with_tags_and_timestamp() { + // All optional fields together in different orders + let input = "test.metric:10|c|@0.1|#env:prod|T1234567890"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + assert_eq!(metric.timestamp, 1234567890); + assert!(metric.tags.is_some()); + } + + #[test] + fn parse_distribution_type() { + // Test distribution metric type + let input = "test.metric:99.5|d|#tag:val"; + let metric = parse(input).unwrap(); + assert_eq!(metric.name.as_str(), "test.metric"); + match metric.value { + MetricValue::Distribution(sketch) => { + assert!(sketch.min().is_some()); + } + _ => panic!("Expected distribution metric"), + } + } }