Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
64c60bc
init test origins
DylanLovesCoffee Feb 3, 2025
237eebb
test more than one check
DylanLovesCoffee Feb 5, 2025
0084b32
sanity check
DylanLovesCoffee Feb 5, 2025
83fcbd2
apply to sketches
DylanLovesCoffee Feb 5, 2025
0b62b66
add origins list
DylanLovesCoffee Feb 5, 2025
e086e35
metrics origins
DylanLovesCoffee Feb 5, 2025
c7c588c
other serverless types
DylanLovesCoffee Feb 6, 2025
472b077
cleanup
DylanLovesCoffee Feb 6, 2025
8667213
add tests
DylanLovesCoffee Feb 10, 2025
4c3a355
test
DylanLovesCoffee Feb 10, 2025
4d6e311
test again
DylanLovesCoffee Feb 10, 2025
8a48e8b
test again
DylanLovesCoffee Feb 11, 2025
0e5e492
check tags
DylanLovesCoffee Feb 11, 2025
0e73ade
fix tags
DylanLovesCoffee Feb 11, 2025
803b412
only custom metrics
DylanLovesCoffee Feb 11, 2025
f35aa56
test for series
DylanLovesCoffee Feb 12, 2025
f02d604
exclude DD
DylanLovesCoffee Feb 12, 2025
ffd31df
testing
DylanLovesCoffee Feb 12, 2025
04a8055
rename
DylanLovesCoffee Feb 12, 2025
5407fc5
Merge branch 'main' into dylan/metric-origins
DylanLovesCoffee Feb 12, 2025
b0a8dd8
update tests
DylanLovesCoffee Feb 12, 2025
7f2f889
cleanup
DylanLovesCoffee Feb 12, 2025
5c19950
update
DylanLovesCoffee Feb 12, 2025
ffd91cb
todo
DylanLovesCoffee Feb 12, 2025
932f15e
update aggregator
DylanLovesCoffee Feb 18, 2025
68543b5
refactor origin
DylanLovesCoffee Feb 18, 2025
384a5b7
clippy
DylanLovesCoffee Feb 18, 2025
e0ebca2
refactor tag checking
DylanLovesCoffee Feb 20, 2025
1e36f6f
update with azure functions
DylanLovesCoffee Feb 20, 2025
2868e8a
Merge branch 'main' into dylan/metric-origins
hghotra Mar 12, 2025
7db1305
Add timestamp to metric constructor in test
hghotra Mar 12, 2025
50faf7c
Refactor based on alex's comment
hghotra Mar 13, 2025
68c4e83
Factor out reduntant functions
hghotra Mar 13, 2025
c0853de
Update vars & add comments
hghotra Mar 13, 2025
147216e
Better naming
hghotra Mar 13, 2025
e45ec69
Remove ambiguity in tag search
hghotra Mar 13, 2025
95b94ed
Pacify clippy
hghotra Mar 13, 2025
d491ddd
Fix logic when tag value not available
hghotra Mar 13, 2025
927967c
elide the lifetimes
hghotra Mar 13, 2025
d2b8751
Merge branch 'main' into dylan/metric-origins
hghotra Mar 16, 2025
d832457
Step function metrics are not sent using dogstatsd
hghotra Mar 17, 2025
dd2fcaf
Merge branch 'main' into dylan/metric-origins
hghotra Mar 18, 2025
d720dd8
Merge branch 'main' into dylan/metric-origins
ivoanjo Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
//! The aggregation of metrics.

use crate::constants;
use crate::datadog::{self, Metric as MetricToShip, Series};
use crate::datadog::{
self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series,
};
use crate::errors;
use crate::metric::{self, Metric, MetricValue, SortedTags};
use crate::origin::find_metric_origin;

use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload};
use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload};
use ddsketch_agent::DDSketch;
use hashbrown::hash_table;
use protobuf::Message;
use protobuf::{Message, MessageField, SpecialFields};
use tracing::{error, warn};
use ustr::Ustr;

Expand Down Expand Up @@ -261,6 +264,14 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch>
base_tag_vec.extend(&tags);
}
sketch.set_tags(base_tag_vec.to_chars());

if let Some(origin) = find_metric_origin(entry, base_tag_vec) {
sketch.set_metadata(Metadata {
origin: MessageField::some(origin),
special_fields: SpecialFields::default(),
});
}

Some(sketch)
}

Expand All @@ -286,12 +297,22 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricTo
base_tag_vec.extend(&tags);
}

let origin = find_metric_origin(entry, base_tag_vec.clone());
let metadata = origin.map(|o| MetadataToShip {
origin: Some(OriginToShip {
origin_product: o.origin_product,
origin_sub_product: o.origin_category,
origin_product_detail: o.origin_service,
}),
});

Some(MetricToShip {
metric: entry.name.as_str(),
resources,
kind,
points: [point; 1],
tags: base_tag_vec.to_strings(),
metadata,
})
}

Expand All @@ -308,7 +329,7 @@ pub mod tests {

const PRECISION: f64 = 0.000_000_01;

const SINGLE_METRIC_SIZE: usize = 193; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_METRIC_SIZE: usize = 209; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_DISTRIBUTION_SIZE: u64 = 140;
const DEFAULT_TAGS: &str =
"dd_extension_version:63-next,architecture:x86_64,_dd.compute_stats:1";
Expand Down Expand Up @@ -673,7 +694,7 @@ pub mod tests {
fn consume_metrics_batch_bytes() {
let expected_metrics_per_batch = 2;
let total_number_of_metrics = 5;
let two_metrics_size = 374;
let two_metrics_size = 406;
let max_bytes = SINGLE_METRIC_SIZE * expected_metrics_per_batch + 13;
let mut aggregator = Aggregator {
tags: to_sorted_tags(),
Expand Down
14 changes: 14 additions & 0 deletions dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,20 @@ pub(crate) struct Metric {
/// The kind of metric
pub(crate) kind: DdMetricKind,
pub(crate) tags: Vec<String>,
/// Optional metadata associated with the metric
pub(crate) metadata: Option<Metadata>,
}

#[derive(Debug, Serialize)]
pub struct Metadata {
pub(crate) origin: Option<Origin>,
}

#[derive(Debug, Serialize)]
pub struct Origin {
pub(crate) origin_product: u32,
pub(crate) origin_sub_product: u32,
pub(crate) origin_product_detail: u32,
}

#[derive(Debug, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions dogstatsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pub mod dogstatsd;
pub mod errors;
pub mod flusher;
pub mod metric;
pub mod origin;
16 changes: 16 additions & 0 deletions dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ impl SortedTags {
tags_as_vec
}

pub fn find_all(&self, tag_key: &str) -> Vec<&Ustr> {
self.values
.iter()
.filter_map(|(k, v)| if k == tag_key { Some(v) } else { None })
.collect()
}

pub(crate) fn to_resources(&self) -> Vec<datadog::Resource> {
let mut resources = Vec::with_capacity(constants::MAX_TAGS);
for (key, val) in &self.values {
Expand Down Expand Up @@ -658,4 +665,13 @@ mod tests {
assert_eq!(first_element.0, Ustr::from("a"));
assert_eq!(first_element.1, Ustr::from("a1"));
}

#[test]
fn sorted_tags_find_all() {
let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap();
assert_eq!(tags.find_all("a"), vec![&Ustr::from(""), &Ustr::from("1")]);
assert_eq!(tags.find_all("b"), vec![&Ustr::from("2")]);
assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]);
assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new());
}
}
246 changes: 246 additions & 0 deletions dogstatsd/src/origin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::metric::{Metric, SortedTags};
use datadog_protos::metrics::Origin;

// Metric tag keys
const DD_ORIGIN_TAG_KEY: &str = "origin";
const AWS_LAMBDA_TAG_KEY: &str = "function_arn";

// Metric tag values
const GOOGLE_CLOUD_RUN_TAG_VALUE: &str = "cloudrun";
const AZURE_APP_SERVICES_TAG_VALUE: &str = "appservice";
const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp";
const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction";

// Metric prefixes
const DATADOG_PREFIX: &str = "datadog";
const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run";
const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services";
const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps";
const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions";
const AWS_LAMBDA_PREFIX: &str = "aws.lambda";

/// Represents the product origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L161
pub enum OriginProduct {
Other = 0,
Serverless = 1,
}

impl From<OriginProduct> for u32 {
fn from(product: OriginProduct) -> u32 {
product as u32
}
}

/// Represents the category origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L276
pub enum OriginCategory {
Other = 0,
AppServicesMetrics = 35,
CloudRunMetrics = 36,
ContainerAppMetrics = 37,
LambdaMetrics = 38,
AzureFunctionsMetrics = 71,
}

impl From<OriginCategory> for u32 {
fn from(category: OriginCategory) -> u32 {
category as u32
}
}

/// Represents the service origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417
pub enum OriginService {
Other = 0,
}

impl From<OriginService> for u32 {
fn from(service: OriginService) -> u32 {
service as u32
}
}

/// Struct to hold tag key, tag value, and prefix for matching.
struct MetricOriginCheck {
tag_key: &'static str,
tag_value: &'static str,
prefix: &'static str,
}

impl MetricOriginCheck {
/// Checks if the tag matches the given key, value, and prefix.
fn matches(&self, tags: &SortedTags, metric_prefix: &str) -> bool {
has_tag_value(tags, self.tag_key, self.tag_value) && metric_prefix != self.prefix
}
}

const METRIC_ORIGIN_CHECKS: &[MetricOriginCheck] = &[
MetricOriginCheck {
tag_key: DD_ORIGIN_TAG_KEY,
tag_value: GOOGLE_CLOUD_RUN_TAG_VALUE,
prefix: GOOGLE_CLOUD_RUN_PREFIX,
},
MetricOriginCheck {
tag_key: DD_ORIGIN_TAG_KEY,
tag_value: AZURE_APP_SERVICES_TAG_VALUE,
prefix: AZURE_APP_SERVICES_PREFIX,
},
MetricOriginCheck {
tag_key: DD_ORIGIN_TAG_KEY,
tag_value: AZURE_CONTAINER_APP_TAG_VALUE,
prefix: AZURE_CONTAINER_APP_PREFIX,
},
MetricOriginCheck {
tag_key: DD_ORIGIN_TAG_KEY,
tag_value: AZURE_FUNCTIONS_TAG_VALUE,
prefix: AZURE_FUNCTIONS_PREFIX,
},
MetricOriginCheck {
tag_key: AWS_LAMBDA_TAG_KEY,
tag_value: "",
prefix: AWS_LAMBDA_PREFIX,
},
];

/// Creates an Origin for serverless metrics.
fn serverless_origin(category: OriginCategory) -> Origin {
Origin {
origin_product: OriginProduct::Serverless.into(),
origin_service: OriginService::Other.into(),
origin_category: category.into(),
..Default::default()
}
}

/// Finds the origin of a metric based on its tags and name prefix.
pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option<Origin> {
let metric_name = metric.name.to_string();
let metric_prefix = metric_name
.split('.')
.take(2)
.collect::<Vec<&str>>()
.join(".");

if is_datadog_metric(&metric_prefix) {
return None;
}

for (index, origin_check) in METRIC_ORIGIN_CHECKS.iter().enumerate() {
if origin_check.matches(&tags, &metric_prefix) {
let category = match index {
0 => OriginCategory::CloudRunMetrics,
1 => OriginCategory::AppServicesMetrics,
2 => OriginCategory::ContainerAppMetrics,
3 => OriginCategory::AzureFunctionsMetrics,
4 => OriginCategory::LambdaMetrics,
_ => OriginCategory::Other,
};
return Some(serverless_origin(category));
}
}

None
}

/// Checks if the given key-value pair exists in the tags.
fn has_tag_value(tags: &SortedTags, key: &str, value: &str) -> bool {
if value.is_empty() {
return !tags.find_all(key).is_empty();
}
tags.find_all(key)
.iter()
.any(|tag_value| tag_value.as_str() == value)
}

/// Checks if the metric is a Datadog metric.
fn is_datadog_metric(prefix: &str) -> bool {
prefix == DATADOG_PREFIX
}

#[cfg(test)]
mod tests {
use crate::metric::MetricValue;

use super::*;

#[test]
fn test_origin_product() {
let origin_product: u32 = OriginProduct::Serverless.into();
assert_eq!(origin_product, 1);
}

#[test]
fn test_origin_category() {
let origin_category: u32 = OriginCategory::LambdaMetrics.into();
assert_eq!(origin_category, 38);
}

#[test]
fn test_origin_service() {
let origin_service: u32 = OriginService::Other.into();
assert_eq!(origin_service, 0);
}

#[test]
fn test_find_metric_origin_aws_lambda_standard_metric() {
let tags = SortedTags::parse("function_arn:hello123").unwrap();
let mut now = 1656581409;
now = (now / 10) * 10;

let metric = Metric {
id: 0,
name: "aws.lambda.enhanced.invocations".into(),
value: MetricValue::Gauge(1.0),
tags: Some(tags.clone()),
timestamp: now,
};
let origin = find_metric_origin(&metric, tags);
assert_eq!(origin, None);
}

#[test]
fn test_find_metric_origin_aws_lambda_custom_metric() {
let tags = SortedTags::parse("function_arn:hello123").unwrap();
let mut now = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
now = (now / 10) * 10;

let metric = Metric {
id: 0,
name: "my.custom.aws.lambda.invocations".into(),
value: MetricValue::Gauge(1.0),
tags: Some(tags.clone()),
timestamp: now,
};
let origin = find_metric_origin(&metric, tags);
assert_eq!(
origin,
Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::LambdaMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
})
);
}

#[test]
fn test_has_tag_value() {
let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap();
assert!(has_tag_value(&tags, "a", "1"));
assert!(has_tag_value(&tags, "b", "2"));
assert!(has_tag_value(&tags, "c", "3"));
assert!(!has_tag_value(&tags, "d", "4"));
}
}
Loading