diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 0e87473f0..a338cf00b 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -54,6 +54,7 @@ use bottlecap::{ proxy_aggregator, proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, + stats_concentrator_service::StatsConcentratorService, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_aggregator::{self, SendDataBuilderInfo}, @@ -629,6 +630,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; } @@ -644,6 +646,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; let next_response = next_event(client, &r.extension_id).await; @@ -721,6 +724,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; last_continuous_flush_error = false; @@ -761,6 +765,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; } @@ -818,6 +823,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + true, // force_flush_trace_stats ) .await; return Ok(()); @@ -825,6 +831,7 @@ async fn extension_loop_active( } } +#[allow(clippy::too_many_arguments)] async fn blocking_flush_all( logs_flusher: &LogsFlusher, metrics_flushers: &mut [MetricsFlusher], @@ -833,6 +840,7 @@ async fn blocking_flush_all( proxy_flusher: &ProxyFlusher, race_flush_interval: &mut tokio::time::Interval, metrics_aggr_handle: &MetricsAggregatorHandle, + force_flush_trace_stats: bool, ) { let flush_response = metrics_aggr_handle .flush() @@ -852,7 +860,7 @@ async fn blocking_flush_all( logs_flusher.flush(None), futures::future::join_all(metrics_futures), trace_flusher.flush(None), - stats_flusher.flush(), + stats_flusher.flush(force_flush_trace_stats), proxy_flusher.flush(None), ); race_flush_interval.reset(); @@ -1100,7 +1108,12 @@ fn start_trace_agent( tokio_util::sync::CancellationToken, ) { // Stats - let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default())); + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(config)); + tokio::spawn(stats_concentrator_service.run()); + let stats_aggregator: Arc> = Arc::new(TokioMutex::new( + StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), + )); let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), @@ -1142,12 +1155,13 @@ fn start_trace_agent( Arc::clone(config), trace_aggregator, trace_processor.clone(), - stats_aggregator, + stats_aggregator.clone(), stats_processor, proxy_aggregator, invocation_processor, appsec_processor, Arc::clone(tags_provider), + stats_concentrator_handle.clone(), ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index b6d444fc5..fdcd0f400 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -337,6 +337,11 @@ pub struct EnvConfig { /// The maximum depth of the Lambda payload to capture. /// Default is `10`. Requires `capture_lambda_payload` to be `true`. pub capture_lambda_payload_max_depth: Option, + /// @env `DD_COMPUTE_TRACE_STATS` + /// + /// Enable computation of trace stats for AWS Lambda. + #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] + pub compute_trace_stats: Option, /// @env `DD_SERVERLESS_APPSEC_ENABLED` /// /// Enable Application and API Protection (AAP), previously known as AppSec/ASM, for AWS Lambda. @@ -513,6 +518,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option_to_value!(config, env_config, lambda_proc_enhanced_metrics); merge_option_to_value!(config, env_config, capture_lambda_payload); merge_option_to_value!(config, env_config, capture_lambda_payload_max_depth); + merge_option_to_value!(config, env_config, compute_trace_stats); merge_option_to_value!(config, env_config, serverless_appsec_enabled); merge_option!(config, env_config, appsec_rules); merge_option_to_value!(config, env_config, appsec_waf_timeout); diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 0dc7b734f..8fc3ae9d4 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -334,6 +334,7 @@ pub struct Config { pub lambda_proc_enhanced_metrics: bool, pub capture_lambda_payload: bool, pub capture_lambda_payload_max_depth: u32, + pub compute_trace_stats: bool, pub serverless_appsec_enabled: bool, pub appsec_rules: Option, @@ -429,6 +430,7 @@ impl Default for Config { lambda_proc_enhanced_metrics: true, capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, + compute_trace_stats: false, serverless_appsec_enabled: false, appsec_rules: None, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index b8dca19b3..a52521a47 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -94,6 +94,8 @@ pub struct YamlConfig { pub capture_lambda_payload: Option, pub capture_lambda_payload_max_depth: Option, #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] + pub compute_trace_stats: Option, + #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub serverless_appsec_enabled: Option, pub appsec_rules: Option, #[serde(deserialize_with = "deserialize_optional_duration_from_microseconds")] @@ -613,6 +615,7 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { merge_option_to_value!(config, yaml_config, lambda_proc_enhanced_metrics); merge_option_to_value!(config, yaml_config, capture_lambda_payload); merge_option_to_value!(config, yaml_config, capture_lambda_payload_max_depth); + merge_option_to_value!(config, yaml_config, compute_trace_stats); merge_option_to_value!(config, yaml_config, serverless_appsec_enabled); merge_option!(config, yaml_config, appsec_rules); merge_option_to_value!(config, yaml_config, appsec_waf_timeout); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index a1d0d94b6..0a40a8559 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -44,6 +44,7 @@ use crate::lifecycle::invocation::triggers::get_default_service_name; pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const S_TO_NS_U64: u64 = 1_000_000_000; pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; @@ -127,12 +128,10 @@ impl Processor { self.context_buffer .start_context(&request_id, invocation_span); - let timestamp = std::time::UNIX_EPOCH + let timestamp_secs = std::time::UNIX_EPOCH .elapsed() .expect("can't poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); + .as_secs(); if self.config.lambda_proc_enhanced_metrics { // Collect offsets for network and cpu metrics @@ -161,14 +160,19 @@ impl Processor { } // Increment the invocation metric - self.enhanced_metrics.increment_invocation_metric(timestamp); + self.enhanced_metrics + .increment_invocation_metric(timestamp_secs.try_into().unwrap_or_default()); self.enhanced_metrics.set_invoked_received(); // If `UniversalInstrumentationStart` event happened first, process it if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) { // Infer span self.inferrer.infer_span(&payload_value, &self.aws_config); - self.process_on_universal_instrumentation_start(request_id, headers, payload_value); + self.process_on_universal_instrumentation_start( + request_id.clone(), + headers, + payload_value, + ); } } diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index c20f468a5..2ae4a8acd 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -38,8 +38,6 @@ const SERVICE_KEY: &str = "service"; // ComputeStatsKey is the tag key indicating whether trace stats should be computed const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; -// ComputeStatsValue is the tag value indicating trace stats should be computed -const COMPUTE_STATS_VALUE: &str = "1"; // FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload const FUNCTION_TAGS_KEY: &str = "_dd.tags.function"; // TODO(astuyve) decide what to do with the version @@ -122,10 +120,11 @@ fn tags_from_env( tags_map.extend(config.tags.clone()); } - tags_map.insert( - COMPUTE_STATS_KEY.to_string(), - COMPUTE_STATS_VALUE.to_string(), - ); + // "config.compute_trace_stats == true" means computing stats on the extension side, + // so we set _dd.compute_stats to 0 so stats won't be computed on the backend side. + let compute_stats = i32::from(!config.compute_trace_stats); + tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string()); + tags_map } diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index f3363d67d..420b06207 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -6,13 +6,17 @@ pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; pub mod span_pointers; +pub mod stats_agent; pub mod stats_aggregator; +pub mod stats_concentrator; +pub mod stats_concentrator_service; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; pub mod trace_aggregator; pub mod trace_flusher; pub mod trace_processor; +pub mod trace_stats_processor; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; diff --git a/bottlecap/src/traces/stats_agent.rs b/bottlecap/src/traces/stats_agent.rs new file mode 100644 index 000000000..23f7487bc --- /dev/null +++ b/bottlecap/src/traces/stats_agent.rs @@ -0,0 +1,46 @@ +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::error; + +use super::stats_concentrator_service::StatsConcentratorHandle; + +use super::stats_concentrator::AggregationKey; +use super::stats_concentrator::Stats; + +#[derive(Clone)] +pub struct StatsEvent { + pub time: u64, + pub aggregation_key: AggregationKey, + pub stats: Stats, +} + +#[allow(clippy::module_name_repetitions)] +pub struct StatsAgent { + tx: Sender, + rx: Receiver, + concentrator: StatsConcentratorHandle, +} + +impl StatsAgent { + #[must_use] + pub fn new(concentrator: StatsConcentratorHandle) -> StatsAgent { + let (tx, rx) = mpsc::channel::(1000); + StatsAgent { + tx, + rx, + concentrator, + } + } + + pub async fn spin(&mut self) { + while let Some(event) = self.rx.recv().await { + if let Err(e) = self.concentrator.add(event) { + error!("Error adding stats event to the stats concentrator: {e}"); + } + } + } + + #[must_use] + pub fn get_sender_copy(&self) -> Sender { + self.tx.clone() + } +} diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index fa23c0b63..913d981cd 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -1,5 +1,7 @@ +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_protobuf::pb::ClientStatsPayload; use std::collections::VecDeque; +use tracing::error; #[allow(clippy::empty_line_after_doc_comments)] /// Maximum number of entries in a stat payload. @@ -22,37 +24,44 @@ pub struct StatsAggregator { queue: VecDeque, max_content_size_bytes: usize, buffer: Vec, -} - -impl Default for StatsAggregator { - fn default() -> Self { - StatsAggregator { - queue: VecDeque::new(), - max_content_size_bytes: MAX_CONTENT_SIZE_BYTES, - buffer: Vec::new(), - } - } + concentrator: StatsConcentratorHandle, } /// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog. impl StatsAggregator { #[allow(dead_code)] #[allow(clippy::must_use_candidate)] - pub fn new(max_content_size_bytes: usize) -> Self { + fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self { StatsAggregator { queue: VecDeque::new(), max_content_size_bytes, buffer: Vec::new(), + concentrator, } } + #[must_use] + pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self { + Self::new(MAX_CONTENT_SIZE_BYTES, concentrator) + } + /// Takes in an individual trace stats payload. pub fn add(&mut self, payload: ClientStatsPayload) { self.queue.push_back(payload); } /// Returns a batch of trace stats payloads, subject to the max content size. - pub fn get_batch(&mut self) -> Vec { + pub async fn get_batch(&mut self, force_flush: bool) -> Vec { + // Pull stats data from concentrator + match self.concentrator.get_stats(force_flush).await { + Ok(stats) => { + self.queue.extend(stats); + } + Err(e) => { + error!("Error getting stats from the stats concentrator: {e:?}"); + } + } + let mut batch_size = 0; // Fill the batch @@ -158,12 +167,12 @@ mod tests { aggregator.add(payload.clone()); // The batch should only contain the first 2 payloads - let first_batch = aggregator.get_batch(); + let first_batch = aggregator.get_batch(false); assert_eq!(first_batch, vec![payload.clone(), payload.clone()]); assert_eq!(aggregator.queue.len(), 1); // The second batch should only contain the last log - let second_batch = aggregator.get_batch(); + let second_batch = aggregator.get_batch(false); assert_eq!(second_batch, vec![payload]); assert_eq!(aggregator.queue.len(), 0); } diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs new file mode 100644 index 000000000..ae9a0f207 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -0,0 +1,195 @@ +use crate::config::Config; +use crate::lifecycle::invocation::processor::S_TO_NS_U64; +use crate::traces::stats_agent::StatsEvent; +use datadog_trace_protobuf::pb; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::error; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct AggregationKey { + pub env: String, + pub service: String, + // e.g. "aws.lambda.load", "aws.lambda.import" + pub name: String, + // e.g. "my-lambda-function-name", "datadog_lambda.handler", "urllib.request" + pub resource: String, + // e.g. "aws.lambda.load", "aws.lambda.import" + pub r#type: String, +} + +// Aggregated stats for a time interval across all the aggregation keys. +#[derive(Default)] +struct Bucket { + data: HashMap, +} + +#[derive(Clone, Debug, Default, Copy)] +pub struct Stats { + pub hits: i32, + pub duration: i64, // in nanoseconds + pub error: i32, + pub top_level_hits: f64, +} + +pub struct StatsConcentrator { + config: Arc, + buckets: HashMap, +} + +// The duration of a bucket in nanoseconds. +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS_U64; // 10 seconds + +// The number of latest buckets to not flush when force_flush is false. +// For example, if we have buckets with timestamps 10, 20, 40, the current timestamp is 45, +// and NO_FLUSH_BUCKET_COUNT is 3, then we will flush bucket 10 but not bucket 20 or 40. +// Note that the bucket 30 is included in the 3 latest buckets even if it has no data. +// This is to reduce the chance of flushing stats that are still being collected to save some cost. +const NO_FLUSH_BUCKET_COUNT: u64 = 2; + +// Aggregates stats into buckets, which are then pulled by the stats aggregator. +impl StatsConcentrator { + #[must_use] + pub fn new(config: Arc) -> Self { + Self { + buckets: HashMap::new(), + config, + } + } + + pub fn add(&mut self, stats_event: StatsEvent) { + let bucket_timestamp = Self::get_bucket_timestamp(stats_event.time); + let bucket = self.buckets.entry(bucket_timestamp).or_default(); + + let stats = bucket.data.entry(stats_event.aggregation_key).or_default(); + + stats.hits += stats_event.stats.hits; + stats.error += stats_event.stats.error; + stats.duration += stats_event.stats.duration; + stats.top_level_hits += stats_event.stats.top_level_hits; + } + + fn get_bucket_timestamp(timestamp: u64) -> u64 { + timestamp - timestamp % BUCKET_DURATION_NS + } + + // force_flush: If true, flush all stats. If false, flush stats except for the few latest + // buckets, which may still be getting data. + #[must_use] + pub fn get_stats(&mut self, force_flush: bool) -> Vec { + let current_timestamp: u64 = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(duration) => { + if let Ok(ts) = duration.as_nanos().try_into() { + ts + } else { + error!("Timestamp overflow, skipping stats flush"); + return Vec::new(); + } + } + Err(e) => { + error!("Failed to get current timestamp: {e}, skipping stats flush"); + return Vec::new(); + } + }; + + let mut ret = Vec::new(); + self.buckets.retain(|×tamp, bucket| { + if force_flush || Self::should_flush_bucket(current_timestamp, timestamp) { + // Flush and remove this bucket + for (aggregation_key, stats) in &bucket.data { + ret.push(Self::construct_stats_payload( + &self.config, + timestamp, + aggregation_key, + *stats, + )); + } + false + } else { + // Keep this bucket + true + } + }); + + ret + } + + // Whether a bucket should be flushed based on the current timestamp and the bucket timestamp. + fn should_flush_bucket(current_timestamp: u64, bucket_timestamp: u64) -> bool { + current_timestamp - bucket_timestamp >= BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT + } + + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + fn construct_stats_payload( + config: &Config, + timestamp: u64, + aggregation_key: &AggregationKey, + stats: Stats, + ) -> pb::ClientStatsPayload { + pb::ClientStatsPayload { + hostname: String::new(), + env: aggregation_key.env.clone(), + version: config.version.clone().unwrap_or_default(), + lang: "rust".to_string(), + tracer_version: String::new(), + runtime_id: String::new(), + sequence: 0, + agent_aggregation: String::new(), + service: aggregation_key.service.clone(), + container_id: String::new(), + tags: vec![], + git_commit_sha: String::new(), + image_tag: String::new(), + stats: vec![pb::ClientStatsBucket { + start: timestamp, + duration: BUCKET_DURATION_NS, + stats: vec![pb::ClientGroupedStats { + service: aggregation_key.service.clone(), + name: aggregation_key.name.clone(), + resource: aggregation_key.resource.clone(), + http_status_code: 0, + r#type: aggregation_key.r#type.clone(), + db_type: String::new(), + hits: stats.hits.try_into().unwrap_or_default(), + errors: stats.error.try_into().unwrap_or_default(), + duration: stats.duration.try_into().unwrap_or_default(), + ok_summary: vec![], + error_summary: vec![], + synthetics: false, + top_level_hits: stats.top_level_hits.round() as u64, + span_kind: String::new(), + peer_tags: vec![], + is_trace_root: 1, + }], + agent_time_shift: 0, + }], + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_flush_bucket_false_when_not_enough_time_passed() { + let bucket_timestamp = 1_000_000_000; + let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * (NO_FLUSH_BUCKET_COUNT - 1); + assert!( + !StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), + "Should not flush when current_timestamp is less than threshold ahead" + ); + } + + #[test] + fn test_should_flush_bucket_true_when_much_later() { + let bucket_timestamp = 1_000_000_000; + let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * (NO_FLUSH_BUCKET_COUNT + 5); + assert!( + StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), + "Should flush when current_timestamp is much greater than threshold" + ); + } +} diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs new file mode 100644 index 000000000..b3f745219 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -0,0 +1,86 @@ +use tokio::sync::{mpsc, oneshot}; + +use super::stats_agent::StatsEvent; +use super::stats_concentrator::StatsConcentrator; +use crate::config::Config; +use datadog_trace_protobuf::pb; +use std::sync::Arc; +use tracing::error; + +#[derive(Debug)] +pub enum StatsError { + SendError(mpsc::error::SendError), + RecvError(oneshot::error::RecvError), +} + +impl From> for StatsError { + fn from(err: mpsc::error::SendError) -> Self { + StatsError::SendError(err) + } +} + +impl From for StatsError { + fn from(err: oneshot::error::RecvError) -> Self { + StatsError::RecvError(err) + } +} + +pub enum ConcentratorCommand { + Add(StatsEvent), + GetStats(bool, oneshot::Sender>), +} + +#[derive(Clone)] +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, +} + +impl StatsConcentratorHandle { + pub fn add( + &self, + stats_event: StatsEvent, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(ConcentratorCommand::Add(stats_event)) + } + + pub async fn get_stats( + &self, + force_flush: bool, + ) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::GetStats(force_flush, response_tx))?; + let stats = response_rx.await?; + Ok(stats) + } +} + +pub struct StatsConcentratorService { + concentrator: StatsConcentrator, + rx: mpsc::UnboundedReceiver, +} + +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle { tx }; + let concentrator = StatsConcentrator::new(config); + let service: StatsConcentratorService = Self { concentrator, rx }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event), + ConcentratorCommand::GetStats(force_flush, response_tx) => { + let stats = self.concentrator.get_stats(force_flush); + if let Err(e) = response_tx.send(stats) { + error!("Failed to return trace stats: {e:?}"); + } + } + } + } + } +} diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 71a86fe9d..95ba905bb 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -28,7 +28,7 @@ pub trait StatsFlusher { /// Flushes stats to the Datadog trace stats intake. async fn send(&self, traces: Vec); - async fn flush(&self); + async fn flush(&self, force_flush: bool); } #[allow(clippy::module_name_repetitions)] @@ -116,14 +116,15 @@ impl StatsFlusher for ServerlessStatsFlusher { } }; } - async fn flush(&self) { + + async fn flush(&self, force_flush: bool) { let mut guard = self.aggregator.lock().await; - let mut stats = guard.get_batch(); + let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { self.send(stats).await; - stats = guard.get_batch(); + stats = guard.get_batch(force_flush).await; } } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index b920c2513..5acd21d0c 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -41,6 +41,10 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; +use crate::traces::stats_agent::StatsAgent; +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; +use crate::traces::trace_stats_processor::SendingTraceStatsProcessor; + const TRACE_AGENT_PORT: usize = 8126; // Agent endpoints @@ -75,6 +79,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_sender: Arc, + pub stats_sender: Arc, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -102,6 +107,7 @@ pub struct TraceAgent { appsec_processor: Option>>, shutdown_token: CancellationToken, tx: Sender, + stats_agent: Arc>, } #[derive(Clone, Copy)] @@ -123,6 +129,7 @@ impl TraceAgent { invocation_processor: Arc>, appsec_processor: Option>>, tags_provider: Arc, + stats_concentrator: StatsConcentratorHandle, ) -> TraceAgent { // Set up a channel to send processed traces to our trace aggregator. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -138,6 +145,8 @@ impl TraceAgent { } }); + let stats_agent = StatsAgent::new(stats_concentrator.clone()); + TraceAgent { config: config.clone(), trace_processor, @@ -149,9 +158,11 @@ impl TraceAgent { tags_provider, tx: trace_tx, shutdown_token: CancellationToken::new(), + stats_agent: Arc::new(Mutex::new(stats_agent)), } } + #[allow(clippy::cast_possible_truncation)] pub async fn start(&self) -> Result<(), Box> { let now = Instant::now(); @@ -170,7 +181,13 @@ impl TraceAgent { } }); - let router = self.make_router(stats_tx); + // Start the stats agent, which receives stats events and sends them to the stats concentrator + let stats_agent = self.stats_agent.clone(); + tokio::spawn(async move { + stats_agent.lock().await.spin().await; + }); + + let router = self.make_router(stats_tx).await; let port = u16::try_from(TRACE_AGENT_PORT).expect("TRACE_AGENT_PORT is too large"); let socket = SocketAddr::from(([127, 0, 0, 1], port)); @@ -190,7 +207,8 @@ impl TraceAgent { Ok(()) } - fn make_router(&self, stats_tx: Sender) -> Router { + async fn make_router(&self, stats_tx: Sender) -> Router { + let stats_agent_tx = self.stats_agent.lock().await.get_sender_copy(); let trace_state = TraceState { config: Arc::clone(&self.config), trace_sender: Arc::new(SendingTraceProcessor { @@ -198,6 +216,7 @@ impl TraceAgent { processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), }), + stats_sender: Arc::new(SendingTraceStatsProcessor::new(stats_agent_tx)), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -266,6 +285,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V04, @@ -278,6 +298,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V05, @@ -417,6 +438,7 @@ impl TraceAgent { config: Arc, request: Request, trace_sender: Arc, + stats_sender: Arc, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, @@ -509,7 +531,16 @@ impl TraceAgent { } } - match trace_sender + if config.compute_trace_stats { + if let Err(err) = stats_sender.send(&traces).await { + return error_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error sending stats to the stats aggregator: {err}"), + ); + } + } + + if let Err(err) = trace_sender .send_processed_traces( config, tags_provider, @@ -520,12 +551,13 @@ impl TraceAgent { ) .await { - Ok(()) => success_response("Successfully buffered traces to be aggregated."), - Err(err) => error_response( + return error_response( StatusCode::INTERNAL_SERVER_ERROR, format!("Error sending traces to the trace aggregator: {err}"), - ), + ); } + + success_response("Successfully buffered traces to be aggregated.") } #[allow(clippy::too_many_arguments)] diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs new file mode 100644 index 000000000..a96fb3c3b --- /dev/null +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -0,0 +1,46 @@ +use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::error::SendError; +use tracing::debug; + +use super::stats_agent::StatsEvent; +use super::stats_concentrator::AggregationKey; +use super::stats_concentrator::Stats; + +use datadog_trace_protobuf::pb; + +pub struct SendingTraceStatsProcessor { + stats_tx: Sender, +} + +impl SendingTraceStatsProcessor { + #[must_use] + pub fn new(stats_tx: Sender) -> Self { + Self { stats_tx } + } + + pub async fn send(&self, traces: &[Vec]) -> Result<(), SendError> { + debug!("Sending trace stats to the concentrator"); + for trace in traces { + for span in trace { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey { + env: span.meta.get("env").cloned().unwrap_or_default(), + service: span.service.clone(), + name: span.name.clone(), + resource: span.resource.clone(), + r#type: span.r#type.clone(), + }, + stats: Stats { + hits: 1, + error: span.error, + duration: span.duration, + top_level_hits: span.metrics.get("_dd.top_level").map_or(0.0, |v| *v), + }, + }; + self.stats_tx.send(stats).await?; + } + } + Ok(()) + } +}