From 8b39c5e42eb75f03580ac8b6b5b3ced9db8a0e7e Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 15:19:48 -0400 Subject: [PATCH 1/9] Implement stats concentrator --- bottlecap/src/traces/stats_concentrator.rs | 157 +++++++++++++++++++-- bottlecap/src/traces/stats_generator.rs | 15 +- 2 files changed, 160 insertions(+), 12 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 511e3fbc6..685bd923e 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -1,38 +1,175 @@ use crate::config::Config; use datadog_trace_protobuf::pb; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc, time::{SystemTime, UNIX_EPOCH}}; +use tracing::error; // Event sent to the stats concentrator -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct StatsEvent { pub time: u64, pub aggregation_key: AggregationKey, pub stats: Stats, } -#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] -pub struct AggregationKey {} +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct AggregationKey { + pub env: String, + pub service: String, + // e.g. "aws.lambda.load", "aws.lambda.import" + pub name: String, + // e.g. "my-lambda-function-name", "datadog_lambda.handler", "urllib.request" + pub resource: String, + // e.g. "aws.lambda.load", "aws.lambda.import" + pub r#type: String, +} + +// Aggregated stats for a time interval across all the aggregation keys. +#[derive(Default)] +struct Bucket { + data: HashMap, +} #[derive(Clone, Debug, Default, Copy)] -pub struct Stats {} +pub struct Stats { + pub hits: i32, + pub duration: i64, // in nanoseconds + pub error: i32, + pub top_level_hits: f64, +} pub struct StatsConcentrator { - _config: Arc, + config: Arc, + buckets: HashMap, } + +// The number of latest buckets to not flush when force_flush is false. +// For example, if we have buckets with timestamps 10, 20, 40, the current timestamp is 45, +// and NO_FLUSH_BUCKET_COUNT is 3, then we will flush bucket 10 but not bucket 20 or 40. +// Note that the bucket 30 is included in the 3 latest buckets even if it has no data. +// This is to reduce the chance of flushing stats that are still being collected to save some cost. +const NO_FLUSH_BUCKET_COUNT: u64 = 2; + +const S_TO_NS_U64: u64 = 1_000_000_000; + +// The duration of a bucket in nanoseconds. +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS_U64; // 10 seconds + + // Aggregates stats into buckets, which are then pulled by the stats aggregator. impl StatsConcentrator { #[must_use] pub fn new(config: Arc) -> Self { - Self { _config: config } + Self { config, buckets: HashMap::new() } + } + + pub fn add(&mut self, stats_event: StatsEvent) { + let bucket_timestamp = Self::get_bucket_timestamp(stats_event.time); + let bucket = self.buckets.entry(bucket_timestamp).or_default(); + + let stats = bucket.data.entry(stats_event.aggregation_key).or_default(); + + stats.hits += stats_event.stats.hits; + stats.error += stats_event.stats.error; + stats.duration += stats_event.stats.duration; + stats.top_level_hits += stats_event.stats.top_level_hits; } - pub fn add(&mut self, _stats_event: StatsEvent) {} + fn get_bucket_timestamp(timestamp: u64) -> u64 { + timestamp - timestamp % BUCKET_DURATION_NS + } // force_flush: If true, flush all stats. If false, flush stats except for the few latest // buckets, which may still be getting data. #[must_use] - pub fn flush(&mut self, _force_flush: bool) -> Vec { - vec![] + pub fn flush(&mut self, force_flush: bool) -> Vec { + let current_timestamp: u64 = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(duration) => { + if let Ok(ts) = duration.as_nanos().try_into() { + ts + } else { + error!("Timestamp overflow, skipping stats flush"); + return Vec::new(); + } + } + Err(e) => { + error!("Failed to get current timestamp: {e}, skipping stats flush"); + return Vec::new(); + } + }; + + let mut ret = Vec::new(); + self.buckets.retain(|×tamp, bucket| { + if force_flush || Self::should_flush_bucket(current_timestamp, timestamp) { + // Flush and remove this bucket + for (aggregation_key, stats) in &bucket.data { + ret.push(Self::construct_stats_payload( + &self.config, + timestamp, + aggregation_key, + *stats, + )); + } + false + } else { + // Keep this bucket + true + } + }); + + ret + } + + // Whether a bucket should be flushed based on the current timestamp and the bucket timestamp. + fn should_flush_bucket(current_timestamp: u64, bucket_timestamp: u64) -> bool { + current_timestamp - bucket_timestamp >= BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT + } + + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + fn construct_stats_payload( + config: &Config, + timestamp: u64, + aggregation_key: &AggregationKey, + stats: Stats, + ) -> pb::ClientStatsPayload { + pb::ClientStatsPayload { + hostname: String::new(), + env: aggregation_key.env.clone(), + version: config.version.clone().unwrap_or_default(), + lang: "rust".to_string(), + tracer_version: String::new(), + runtime_id: String::new(), + sequence: 0, + agent_aggregation: String::new(), + service: aggregation_key.service.clone(), + container_id: String::new(), + tags: vec![], + git_commit_sha: String::new(), + image_tag: String::new(), + stats: vec![pb::ClientStatsBucket { + start: timestamp, + duration: BUCKET_DURATION_NS, + stats: vec![pb::ClientGroupedStats { + service: aggregation_key.service.clone(), + name: aggregation_key.name.clone(), + resource: aggregation_key.resource.clone(), + http_status_code: 0, + r#type: aggregation_key.r#type.clone(), + db_type: String::new(), + hits: stats.hits.try_into().unwrap_or_default(), + errors: stats.error.try_into().unwrap_or_default(), + duration: stats.duration.try_into().unwrap_or_default(), + ok_summary: vec![], + error_summary: vec![], + synthetics: false, + top_level_hits: stats.top_level_hits.round() as u64, + span_kind: String::new(), + peer_tags: vec![], + is_trace_root: 1, + }], + agent_time_shift: 0, + }], + } } } diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index b5a0dbd44..0c0a3462d 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -33,8 +33,19 @@ impl StatsGenerator { for span in &chunk.spans { let stats = StatsEvent { time: span.start.try_into().unwrap_or_default(), - aggregation_key: AggregationKey {}, - stats: Stats {}, + aggregation_key: AggregationKey { + env: span.meta.get("env").cloned().unwrap_or_default(), + service: span.service.clone(), + name: span.name.clone(), + resource: span.resource.clone(), + r#type: span.r#type.clone(), + }, + stats: Stats { + hits: 1, + error: span.error, + duration: span.duration, + top_level_hits: span.metrics.get("_dd.top_level").map_or(0.0, |v| *v), + }, }; if let Err(err) = self.stats_concentrator.add(stats) { error!("Failed to send trace stats: {err}"); From c323875ebfd1d771736592b8a7a3328d99480b8c Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 15:20:00 -0400 Subject: [PATCH 2/9] fmt --- bottlecap/src/traces/stats_concentrator.rs | 13 +++++++++---- bottlecap/src/traces/stats_generator.rs | 5 ++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 685bd923e..f93f8db03 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -1,6 +1,10 @@ use crate::config::Config; use datadog_trace_protobuf::pb; -use std::{collections::HashMap, sync::Arc, time::{SystemTime, UNIX_EPOCH}}; +use std::{ + collections::HashMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use tracing::error; // Event sent to the stats concentrator @@ -42,7 +46,6 @@ pub struct StatsConcentrator { buckets: HashMap, } - // The number of latest buckets to not flush when force_flush is false. // For example, if we have buckets with timestamps 10, 20, 40, the current timestamp is 45, // and NO_FLUSH_BUCKET_COUNT is 3, then we will flush bucket 10 but not bucket 20 or 40. @@ -55,12 +58,14 @@ const S_TO_NS_U64: u64 = 1_000_000_000; // The duration of a bucket in nanoseconds. const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS_U64; // 10 seconds - // Aggregates stats into buckets, which are then pulled by the stats aggregator. impl StatsConcentrator { #[must_use] pub fn new(config: Arc) -> Self { - Self { config, buckets: HashMap::new() } + Self { + config, + buckets: HashMap::new(), + } } pub fn add(&mut self, stats_event: StatsEvent) { diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index 0c0a3462d..286807677 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -44,7 +44,10 @@ impl StatsGenerator { hits: 1, error: span.error, duration: span.duration, - top_level_hits: span.metrics.get("_dd.top_level").map_or(0.0, |v| *v), + top_level_hits: span + .metrics + .get("_dd.top_level") + .map_or(0.0, |v| *v), }, }; if let Err(err) = self.stats_concentrator.add(stats) { From dbfd983c9fd546196af81daff7ebe0a4400efa16 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 23 Sep 2025 14:34:20 -0400 Subject: [PATCH 3/9] Add lots of TODOs --- bottlecap/src/traces/stats_concentrator.rs | 63 +++++++++++++++++----- bottlecap/src/traces/stats_generator.rs | 2 +- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index f93f8db03..f5e47e5eb 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -53,10 +53,10 @@ pub struct StatsConcentrator { // This is to reduce the chance of flushing stats that are still being collected to save some cost. const NO_FLUSH_BUCKET_COUNT: u64 = 2; -const S_TO_NS_U64: u64 = 1_000_000_000; +const S_TO_NS: u64 = 1_000_000_000; // The duration of a bucket in nanoseconds. -const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS_U64; // 10 seconds +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds // Aggregates stats into buckets, which are then pulled by the stats aggregator. impl StatsConcentrator { @@ -84,18 +84,13 @@ impl StatsConcentrator { timestamp - timestamp % BUCKET_DURATION_NS } - // force_flush: If true, flush all stats. If false, flush stats except for the few latest + // force: If true, flush all stats. If false, flush stats except for the few latest // buckets, which may still be getting data. #[must_use] - pub fn flush(&mut self, force_flush: bool) -> Vec { + pub fn flush(&mut self, force: bool) -> Vec { let current_timestamp: u64 = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(duration) => { - if let Ok(ts) = duration.as_nanos().try_into() { - ts - } else { - error!("Timestamp overflow, skipping stats flush"); - return Vec::new(); - } + u64::try_from(duration.as_nanos()).unwrap_or_default() } Err(e) => { error!("Failed to get current timestamp: {e}, skipping stats flush"); @@ -105,7 +100,7 @@ impl StatsConcentrator { let mut ret = Vec::new(); self.buckets.retain(|×tamp, bucket| { - if force_flush || Self::should_flush_bucket(current_timestamp, timestamp) { + if force || Self::should_flush_bucket(current_timestamp, timestamp) { // Flush and remove this bucket for (aggregation_key, stats) in &bucket.data { ret.push(Self::construct_stats_payload( @@ -125,7 +120,6 @@ impl StatsConcentrator { ret } - // Whether a bucket should be flushed based on the current timestamp and the bucket timestamp. fn should_flush_bucket(current_timestamp: u64, bucket_timestamp: u64) -> bool { current_timestamp - bucket_timestamp >= BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT } @@ -139,18 +133,29 @@ impl StatsConcentrator { stats: Stats, ) -> pb::ClientStatsPayload { pb::ClientStatsPayload { + // TODO: handle this hostname: String::new(), env: aggregation_key.env.clone(), + // Version is not in the trace payload. Need to read it from config. version: config.version.clone().unwrap_or_default(), + // TODO: handle this lang: "rust".to_string(), + // TODO: handle this tracer_version: String::new(), + // TODO: handle this runtime_id: String::new(), + // TODO: handle this sequence: 0, + // TODO: handle this agent_aggregation: String::new(), service: aggregation_key.service.clone(), + // TODO: handle this container_id: String::new(), + // TODO: handle this tags: vec![], + // TODO: handle this git_commit_sha: String::new(), + // TODO: handle this image_tag: String::new(), stats: vec![pb::ClientStatsBucket { start: timestamp, @@ -159,22 +164,56 @@ impl StatsConcentrator { service: aggregation_key.service.clone(), name: aggregation_key.name.clone(), resource: aggregation_key.resource.clone(), + // TODO: handle this http_status_code: 0, r#type: aggregation_key.r#type.clone(), + // TODO: handle this db_type: String::new(), hits: stats.hits.try_into().unwrap_or_default(), errors: stats.error.try_into().unwrap_or_default(), duration: stats.duration.try_into().unwrap_or_default(), + // TODO: handle this ok_summary: vec![], + // TODO: handle this error_summary: vec![], + // TODO: handle this synthetics: false, top_level_hits: stats.top_level_hits.round() as u64, + // TODO: handle this span_kind: String::new(), + // TODO: handle this peer_tags: vec![], + // TODO: handle this is_trace_root: 1, }], + // TODO: handle this agent_time_shift: 0, }], } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_flush_bucket_false_when_not_enough_time_passed() { + let bucket_timestamp = 1_000_000_000; + let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT - 1; + assert!( + !StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), + "Should not flush when current_timestamp is less than threshold ahead" + ); + } + + #[test] + fn test_should_flush_bucket_true_when_much_later() { + let bucket_timestamp = 1_000_000_000; + let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT + 1; + assert!( + StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), + "Should flush when current_timestamp is greater than threshold" + ); + } +} \ No newline at end of file diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index 286807677..912f80bdd 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -1,7 +1,7 @@ use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_utils::tracer_payload::TracerPayloadCollection; -use tracing::error; +use tracing::{debug, error}; use tokio::sync::mpsc::error::SendError; From 4be028d641b53e44d1ea432e7debbcf633f3cd5a Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 23 Sep 2025 14:46:20 -0400 Subject: [PATCH 4/9] Rename test --- bottlecap/src/traces/stats_concentrator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index f5e47e5eb..5f73b78ff 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -208,7 +208,7 @@ mod tests { } #[test] - fn test_should_flush_bucket_true_when_much_later() { + fn test_should_flush_bucket_true_when_later() { let bucket_timestamp = 1_000_000_000; let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT + 1; assert!( From a8e9255fbdcf15ad9bbc3b0991d14008eee95785 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 23 Sep 2025 14:48:29 -0400 Subject: [PATCH 5/9] fmt --- bottlecap/src/traces/stats_concentrator.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 5f73b78ff..2d2e8faef 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -89,9 +89,7 @@ impl StatsConcentrator { #[must_use] pub fn flush(&mut self, force: bool) -> Vec { let current_timestamp: u64 = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(duration) => { - u64::try_from(duration.as_nanos()).unwrap_or_default() - } + Ok(duration) => u64::try_from(duration.as_nanos()).unwrap_or_default(), Err(e) => { error!("Failed to get current timestamp: {e}, skipping stats flush"); return Vec::new(); @@ -216,4 +214,4 @@ mod tests { "Should flush when current_timestamp is greater than threshold" ); } -} \ No newline at end of file +} From 4d2fe96acf3673dc6d2f03f2530c0cea9a3bf805 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 23 Sep 2025 14:56:43 -0400 Subject: [PATCH 6/9] Remove unused import --- bottlecap/src/traces/stats_generator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index 912f80bdd..286807677 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -1,7 +1,7 @@ use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_utils::tracer_payload::TracerPayloadCollection; -use tracing::{debug, error}; +use tracing::error; use tokio::sync::mpsc::error::SendError; From 12d25e55500c7513deaefc4c03f2017fba3ae938 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 24 Sep 2025 11:55:02 -0400 Subject: [PATCH 7/9] Rename: error -> errors --- bottlecap/src/traces/stats_concentrator.rs | 17 ++++++++--------- bottlecap/src/traces/stats_generator.rs | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 2d2e8faef..64cbc40f1 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -1,10 +1,7 @@ use crate::config::Config; use datadog_trace_protobuf::pb; -use std::{ - collections::HashMap, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, sync::Arc}; +use std::time::{SystemTime, UNIX_EPOCH}; use tracing::error; // Event sent to the stats concentrator @@ -36,8 +33,10 @@ struct Bucket { #[derive(Clone, Debug, Default, Copy)] pub struct Stats { pub hits: i32, - pub duration: i64, // in nanoseconds - pub error: i32, + // in nanoseconds + pub duration: i64, + // error count + pub errors: i32, pub top_level_hits: f64, } @@ -75,7 +74,7 @@ impl StatsConcentrator { let stats = bucket.data.entry(stats_event.aggregation_key).or_default(); stats.hits += stats_event.stats.hits; - stats.error += stats_event.stats.error; + stats.errors += stats_event.stats.errors; stats.duration += stats_event.stats.duration; stats.top_level_hits += stats_event.stats.top_level_hits; } @@ -168,7 +167,7 @@ impl StatsConcentrator { // TODO: handle this db_type: String::new(), hits: stats.hits.try_into().unwrap_or_default(), - errors: stats.error.try_into().unwrap_or_default(), + errors: stats.errors.try_into().unwrap_or_default(), duration: stats.duration.try_into().unwrap_or_default(), // TODO: handle this ok_summary: vec![], diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index 286807677..8492a80aa 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -42,7 +42,7 @@ impl StatsGenerator { }, stats: Stats { hits: 1, - error: span.error, + errors: span.error, duration: span.duration, top_level_hits: span .metrics From cb6be378c4dccf9347f4fd7e1970d622e3523dae Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 24 Sep 2025 12:01:37 -0400 Subject: [PATCH 8/9] fmt --- bottlecap/src/traces/stats_concentrator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 64cbc40f1..2269ee419 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -1,7 +1,7 @@ use crate::config::Config; use datadog_trace_protobuf::pb; -use std::{collections::HashMap, sync::Arc}; use std::time::{SystemTime, UNIX_EPOCH}; +use std::{collections::HashMap, sync::Arc}; use tracing::error; // Event sent to the stats concentrator @@ -33,7 +33,7 @@ struct Bucket { #[derive(Clone, Debug, Default, Copy)] pub struct Stats { pub hits: i32, - // in nanoseconds + // in nanoseconds pub duration: i64, // error count pub errors: i32, From c099094ff564bbbb89b82200303cf653864d7651 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 25 Sep 2025 13:26:03 -0400 Subject: [PATCH 9/9] Fix comment for r#type --- bottlecap/src/traces/stats_concentrator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 2269ee419..de4e16b04 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -20,7 +20,7 @@ pub struct AggregationKey { pub name: String, // e.g. "my-lambda-function-name", "datadog_lambda.handler", "urllib.request" pub resource: String, - // e.g. "aws.lambda.load", "aws.lambda.import" + // e.g. "serverless" pub r#type: String, }