diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index f31a3b0fc3..67b9daebbc 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -4,14 +4,17 @@ //! The aggregation of metrics. use crate::constants; -use crate::datadog::{self, Metric as MetricToShip, Series}; +use crate::datadog::{ + self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series, +}; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; +use crate::origin::find_metric_origin; -use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; +use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; use hashbrown::hash_table; -use protobuf::Message; +use protobuf::{Message, MessageField, SpecialFields}; use tracing::{error, warn}; use ustr::Ustr; @@ -261,6 +264,14 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option base_tag_vec.extend(&tags); } sketch.set_tags(base_tag_vec.to_chars()); + + if let Some(origin) = find_metric_origin(entry, base_tag_vec) { + sketch.set_metadata(Metadata { + origin: MessageField::some(origin), + special_fields: SpecialFields::default(), + }); + } + Some(sketch) } @@ -286,12 +297,22 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option, + /// Optional metadata associated with the metric + pub(crate) metadata: Option, +} + +#[derive(Debug, Serialize)] +pub struct Metadata { + pub(crate) origin: Option, +} + +#[derive(Debug, Serialize)] +pub struct Origin { + pub(crate) origin_product: u32, + pub(crate) origin_sub_product: u32, + pub(crate) origin_product_detail: u32, } #[derive(Debug, Serialize)] diff --git a/dogstatsd/src/lib.rs b/dogstatsd/src/lib.rs index 4009db1478..968ee54917 100644 --- a/dogstatsd/src/lib.rs +++ b/dogstatsd/src/lib.rs @@ -14,3 +14,4 @@ pub mod dogstatsd; pub mod errors; pub mod flusher; pub mod metric; +pub mod origin; diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 35ebe1628f..ce4a685acf 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -130,6 +130,13 @@ impl SortedTags { tags_as_vec } + pub fn find_all(&self, tag_key: &str) -> Vec<&Ustr> { + self.values + .iter() + .filter_map(|(k, v)| if k == tag_key { Some(v) } else { None }) + .collect() + } + pub(crate) fn to_resources(&self) -> Vec { let mut resources = Vec::with_capacity(constants::MAX_TAGS); for (key, val) in &self.values { @@ -658,4 +665,13 @@ mod tests { assert_eq!(first_element.0, Ustr::from("a")); assert_eq!(first_element.1, Ustr::from("a1")); } + + #[test] + fn sorted_tags_find_all() { + let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); + assert_eq!(tags.find_all("a"), vec![&Ustr::from(""), &Ustr::from("1")]); + assert_eq!(tags.find_all("b"), vec![&Ustr::from("2")]); + assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]); + assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new()); + } } diff --git a/dogstatsd/src/origin.rs b/dogstatsd/src/origin.rs new file mode 100644 index 0000000000..227dd6df32 --- /dev/null +++ b/dogstatsd/src/origin.rs @@ -0,0 +1,246 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::metric::{Metric, SortedTags}; +use datadog_protos::metrics::Origin; + +// Metric tag keys +const DD_ORIGIN_TAG_KEY: &str = "origin"; +const AWS_LAMBDA_TAG_KEY: &str = "function_arn"; + +// Metric tag values +const GOOGLE_CLOUD_RUN_TAG_VALUE: &str = "cloudrun"; +const AZURE_APP_SERVICES_TAG_VALUE: &str = "appservice"; +const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp"; +const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; + +// Metric prefixes +const DATADOG_PREFIX: &str = "datadog"; +const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services"; +const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; +const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; + +/// Represents the product origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L161 +pub enum OriginProduct { + Other = 0, + Serverless = 1, +} + +impl From for u32 { + fn from(product: OriginProduct) -> u32 { + product as u32 + } +} + +/// Represents the category origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L276 +pub enum OriginCategory { + Other = 0, + AppServicesMetrics = 35, + CloudRunMetrics = 36, + ContainerAppMetrics = 37, + LambdaMetrics = 38, + AzureFunctionsMetrics = 71, +} + +impl From for u32 { + fn from(category: OriginCategory) -> u32 { + category as u32 + } +} + +/// Represents the service origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417 +pub enum OriginService { + Other = 0, +} + +impl From for u32 { + fn from(service: OriginService) -> u32 { + service as u32 + } +} + +/// Struct to hold tag key, tag value, and prefix for matching. +struct MetricOriginCheck { + tag_key: &'static str, + tag_value: &'static str, + prefix: &'static str, +} + +impl MetricOriginCheck { + /// Checks if the tag matches the given key, value, and prefix. + fn matches(&self, tags: &SortedTags, metric_prefix: &str) -> bool { + has_tag_value(tags, self.tag_key, self.tag_value) && metric_prefix != self.prefix + } +} + +const METRIC_ORIGIN_CHECKS: &[MetricOriginCheck] = &[ + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: GOOGLE_CLOUD_RUN_TAG_VALUE, + prefix: GOOGLE_CLOUD_RUN_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_APP_SERVICES_TAG_VALUE, + prefix: AZURE_APP_SERVICES_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_CONTAINER_APP_TAG_VALUE, + prefix: AZURE_CONTAINER_APP_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_FUNCTIONS_TAG_VALUE, + prefix: AZURE_FUNCTIONS_PREFIX, + }, + MetricOriginCheck { + tag_key: AWS_LAMBDA_TAG_KEY, + tag_value: "", + prefix: AWS_LAMBDA_PREFIX, + }, +]; + +/// Creates an Origin for serverless metrics. +fn serverless_origin(category: OriginCategory) -> Origin { + Origin { + origin_product: OriginProduct::Serverless.into(), + origin_service: OriginService::Other.into(), + origin_category: category.into(), + ..Default::default() + } +} + +/// Finds the origin of a metric based on its tags and name prefix. +pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { + let metric_name = metric.name.to_string(); + let metric_prefix = metric_name + .split('.') + .take(2) + .collect::>() + .join("."); + + if is_datadog_metric(&metric_prefix) { + return None; + } + + for (index, origin_check) in METRIC_ORIGIN_CHECKS.iter().enumerate() { + if origin_check.matches(&tags, &metric_prefix) { + let category = match index { + 0 => OriginCategory::CloudRunMetrics, + 1 => OriginCategory::AppServicesMetrics, + 2 => OriginCategory::ContainerAppMetrics, + 3 => OriginCategory::AzureFunctionsMetrics, + 4 => OriginCategory::LambdaMetrics, + _ => OriginCategory::Other, + }; + return Some(serverless_origin(category)); + } + } + + None +} + +/// Checks if the given key-value pair exists in the tags. +fn has_tag_value(tags: &SortedTags, key: &str, value: &str) -> bool { + if value.is_empty() { + return !tags.find_all(key).is_empty(); + } + tags.find_all(key) + .iter() + .any(|tag_value| tag_value.as_str() == value) +} + +/// Checks if the metric is a Datadog metric. +fn is_datadog_metric(prefix: &str) -> bool { + prefix == DATADOG_PREFIX +} + +#[cfg(test)] +mod tests { + use crate::metric::MetricValue; + + use super::*; + + #[test] + fn test_origin_product() { + let origin_product: u32 = OriginProduct::Serverless.into(); + assert_eq!(origin_product, 1); + } + + #[test] + fn test_origin_category() { + let origin_category: u32 = OriginCategory::LambdaMetrics.into(); + assert_eq!(origin_category, 38); + } + + #[test] + fn test_origin_service() { + let origin_service: u32 = OriginService::Other.into(); + assert_eq!(origin_service, 0); + } + + #[test] + fn test_find_metric_origin_aws_lambda_standard_metric() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let mut now = 1656581409; + now = (now / 10) * 10; + + let metric = Metric { + id: 0, + name: "aws.lambda.enhanced.invocations".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: now, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!(origin, None); + } + + #[test] + fn test_find_metric_origin_aws_lambda_custom_metric() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; + + let metric = Metric { + id: 0, + name: "my.custom.aws.lambda.invocations".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: now, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!( + origin, + Some(Origin { + origin_product: OriginProduct::Serverless.into(), + origin_category: OriginCategory::LambdaMetrics.into(), + origin_service: OriginService::Other.into(), + ..Default::default() + }) + ); + } + + #[test] + fn test_has_tag_value() { + let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); + assert!(has_tag_value(&tags, "a", "1")); + assert!(has_tag_value(&tags, "b", "2")); + assert!(has_tag_value(&tags, "c", "3")); + assert!(!has_tag_value(&tags, "d", "4")); + } +}