diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 4b01eba54..2c522df3d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -57,6 +57,7 @@ use bottlecap::{ proxy_aggregator, proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, + stats_concentrator_service::StatsConcentratorService, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_aggregator::{self, SendDataBuilderInfo}, @@ -522,6 +523,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; } @@ -537,6 +539,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; let next_response = @@ -606,6 +609,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; } @@ -639,6 +643,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; } @@ -697,6 +702,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + true, // force_flush_trace_stats ) .await; return Ok(()); @@ -704,6 +710,7 @@ async fn extension_loop_active( } } +#[allow(clippy::too_many_arguments)] async fn blocking_flush_all( logs_flusher: &LogsFlusher, metrics_flushers: &mut [MetricsFlusher], @@ -712,6 +719,7 @@ async fn blocking_flush_all( proxy_flusher: &ProxyFlusher, race_flush_interval: &mut tokio::time::Interval, metrics_aggr_handle: &MetricsAggregatorHandle, + force_flush_trace_stats: bool, ) { let flush_response = metrics_aggr_handle .flush() @@ -731,7 +739,7 @@ async fn blocking_flush_all( logs_flusher.flush(None), futures::future::join_all(metrics_futures), trace_flusher.flush(None), - stats_flusher.flush(), + stats_flusher.flush(force_flush_trace_stats), proxy_flusher.flush(None), ); race_flush_interval.reset(); @@ -981,7 +989,12 @@ fn start_trace_agent( tokio_util::sync::CancellationToken, ) { // Stats - let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default())); + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(config)); + tokio::spawn(stats_concentrator_service.run()); + let stats_aggregator: Arc> = Arc::new(TokioMutex::new( + StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), + )); let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), @@ -1029,6 +1042,7 @@ fn start_trace_agent( invocation_processor, appsec_processor, Arc::clone(tags_provider), + stats_concentrator_handle, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index f3363d67d..8c883b796 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -7,12 +7,15 @@ pub mod proxy_aggregator; pub mod proxy_flusher; pub mod span_pointers; pub mod stats_aggregator; +pub mod stats_concentrator; +pub mod stats_concentrator_service; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; pub mod trace_aggregator; pub mod trace_flusher; pub mod trace_processor; +pub mod trace_stats_processor; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index fa23c0b63..36074c6d3 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -1,5 +1,7 @@ +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_protobuf::pb::ClientStatsPayload; use std::collections::VecDeque; +use tracing::error; #[allow(clippy::empty_line_after_doc_comments)] /// Maximum number of entries in a stat payload. @@ -22,37 +24,44 @@ pub struct StatsAggregator { queue: VecDeque, max_content_size_bytes: usize, buffer: Vec, -} - -impl Default for StatsAggregator { - fn default() -> Self { - StatsAggregator { - queue: VecDeque::new(), - max_content_size_bytes: MAX_CONTENT_SIZE_BYTES, - buffer: Vec::new(), - } - } + concentrator: StatsConcentratorHandle, } /// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog. impl StatsAggregator { #[allow(dead_code)] #[allow(clippy::must_use_candidate)] - pub fn new(max_content_size_bytes: usize) -> Self { + fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self { StatsAggregator { queue: VecDeque::new(), max_content_size_bytes, buffer: Vec::new(), + concentrator, } } + #[must_use] + pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self { + Self::new(MAX_CONTENT_SIZE_BYTES, concentrator) + } + /// Takes in an individual trace stats payload. pub fn add(&mut self, payload: ClientStatsPayload) { self.queue.push_back(payload); } /// Returns a batch of trace stats payloads, subject to the max content size. - pub fn get_batch(&mut self) -> Vec { + pub async fn get_batch(&mut self, force_flush: bool) -> Vec { + // Pull stats data from concentrator + match self.concentrator.flush(force_flush).await { + Ok(stats) => { + self.queue.extend(stats); + } + Err(e) => { + error!("Error getting stats from the stats concentrator: {e:?}"); + } + } + let mut batch_size = 0; // Fill the batch @@ -80,10 +89,15 @@ impl StatsAggregator { #[allow(clippy::unwrap_used)] mod tests { use super::*; + use crate::config::Config; + use crate::traces::stats_concentrator_service::StatsConcentratorService; + use std::sync::Arc; #[test] fn test_add() { - let mut aggregator = StatsAggregator::default(); + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new_with_concentrator(concentrator); let payload = ClientStatsPayload { hostname: "hostname".to_string(), env: "dev".to_string(), @@ -106,9 +120,11 @@ mod tests { assert_eq!(aggregator.queue[0], payload); } - #[test] - fn test_get_batch() { - let mut aggregator = StatsAggregator::default(); + #[tokio::test] + async fn test_get_batch() { + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new_with_concentrator(concentrator); let payload = ClientStatsPayload { hostname: "hostname".to_string(), env: "dev".to_string(), @@ -127,13 +143,15 @@ mod tests { }; aggregator.add(payload.clone()); assert_eq!(aggregator.queue.len(), 1); - let batch = aggregator.get_batch(); + let batch = aggregator.get_batch(false).await; assert_eq!(batch, vec![payload]); } - #[test] - fn test_get_batch_full_entries() { - let mut aggregator = StatsAggregator::new(640); + #[tokio::test] + async fn test_get_batch_full_entries() { + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new(640, concentrator); // Payload below is 115 bytes let payload = ClientStatsPayload { hostname: "hostname".to_string(), @@ -158,12 +176,12 @@ mod tests { aggregator.add(payload.clone()); // The batch should only contain the first 2 payloads - let first_batch = aggregator.get_batch(); + let first_batch = aggregator.get_batch(false).await; assert_eq!(first_batch, vec![payload.clone(), payload.clone()]); assert_eq!(aggregator.queue.len(), 1); // The second batch should only contain the last log - let second_batch = aggregator.get_batch(); + let second_batch = aggregator.get_batch(false).await; assert_eq!(second_batch, vec![payload]); assert_eq!(aggregator.queue.len(), 0); } diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs new file mode 100644 index 000000000..511e3fbc6 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -0,0 +1,38 @@ +use crate::config::Config; +use datadog_trace_protobuf::pb; +use std::sync::Arc; + +// Event sent to the stats concentrator +#[derive(Clone, Copy)] +pub struct StatsEvent { + pub time: u64, + pub aggregation_key: AggregationKey, + pub stats: Stats, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] +pub struct AggregationKey {} + +#[derive(Clone, Debug, Default, Copy)] +pub struct Stats {} + +pub struct StatsConcentrator { + _config: Arc, +} + +// Aggregates stats into buckets, which are then pulled by the stats aggregator. +impl StatsConcentrator { + #[must_use] + pub fn new(config: Arc) -> Self { + Self { _config: config } + } + + pub fn add(&mut self, _stats_event: StatsEvent) {} + + // force_flush: If true, flush all stats. If false, flush stats except for the few latest + // buckets, which may still be getting data. + #[must_use] + pub fn flush(&mut self, _force_flush: bool) -> Vec { + vec![] + } +} diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs new file mode 100644 index 000000000..c740b9db1 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -0,0 +1,79 @@ +use tokio::sync::{mpsc, oneshot}; + +use crate::config::Config; +use crate::traces::stats_concentrator::StatsConcentrator; +use crate::traces::stats_concentrator::StatsEvent; +use datadog_trace_protobuf::pb; +use std::sync::Arc; +use tracing::error; + +#[derive(Debug, thiserror::Error)] +pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] + SendError(mpsc::error::SendError), + #[error("Failed to receive response from concentrator: {0}")] + RecvError(oneshot::error::RecvError), +} + +pub enum ConcentratorCommand { + Add(StatsEvent), + Flush(bool, oneshot::Sender>), +} + +#[derive(Clone)] +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, +} + +impl StatsConcentratorHandle { + pub fn add( + &self, + stats_event: StatsEvent, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(ConcentratorCommand::Add(stats_event)) + } + + pub async fn flush( + &self, + force_flush: bool, + ) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .map_err(StatsError::SendError)?; + let stats = response_rx.await.map_err(StatsError::RecvError)?; + Ok(stats) + } +} + +pub struct StatsConcentratorService { + concentrator: StatsConcentrator, + rx: mpsc::UnboundedReceiver, +} + +// A service that handles add() and flush() requests in the same queue, +// to avoid using mutex, which may cause lock contention. +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle { tx }; + let concentrator = StatsConcentrator::new(config); + let service: StatsConcentratorService = Self { concentrator, rx }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event), + ConcentratorCommand::Flush(force_flush, response_tx) => { + let stats = self.concentrator.flush(force_flush); + if let Err(e) = response_tx.send(stats) { + error!("Failed to return trace stats: {e:?}"); + } + } + } + } + } +} diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 71a86fe9d..95ba905bb 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -28,7 +28,7 @@ pub trait StatsFlusher { /// Flushes stats to the Datadog trace stats intake. async fn send(&self, traces: Vec); - async fn flush(&self); + async fn flush(&self, force_flush: bool); } #[allow(clippy::module_name_repetitions)] @@ -116,14 +116,15 @@ impl StatsFlusher for ServerlessStatsFlusher { } }; } - async fn flush(&self) { + + async fn flush(&self, force_flush: bool) { let mut guard = self.aggregator.lock().await; - let mut stats = guard.get_batch(); + let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { self.send(stats).await; - stats = guard.get_batch(); + stats = guard.get_batch(force_flush).await; } } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index b920c2513..9145a2a2f 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -41,6 +41,9 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; +use crate::traces::trace_stats_processor::SendingTraceStatsProcessor; + const TRACE_AGENT_PORT: usize = 8126; // Agent endpoints @@ -75,6 +78,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_sender: Arc, + pub stats_sender: Arc, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -102,6 +106,7 @@ pub struct TraceAgent { appsec_processor: Option>>, shutdown_token: CancellationToken, tx: Sender, + stats_concentrator: StatsConcentratorHandle, } #[derive(Clone, Copy)] @@ -123,6 +128,7 @@ impl TraceAgent { invocation_processor: Arc>, appsec_processor: Option>>, tags_provider: Arc, + stats_concentrator: StatsConcentratorHandle, ) -> TraceAgent { // Set up a channel to send processed traces to our trace aggregator. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -149,9 +155,11 @@ impl TraceAgent { tags_provider, tx: trace_tx, shutdown_token: CancellationToken::new(), + stats_concentrator, } } + #[allow(clippy::cast_possible_truncation)] pub async fn start(&self) -> Result<(), Box> { let now = Instant::now(); @@ -198,6 +206,9 @@ impl TraceAgent { processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), }), + stats_sender: Arc::new(SendingTraceStatsProcessor::new( + self.stats_concentrator.clone(), + )), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -266,6 +277,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V04, @@ -278,6 +290,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V05, @@ -417,6 +430,7 @@ impl TraceAgent { config: Arc, request: Request, trace_sender: Arc, + stats_sender: Arc, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, @@ -509,7 +523,16 @@ impl TraceAgent { } } - match trace_sender + if config.compute_trace_stats { + if let Err(err) = stats_sender.send(&traces) { + return error_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error sending stats to the stats aggregator: {err}"), + ); + } + } + + if let Err(err) = trace_sender .send_processed_traces( config, tags_provider, @@ -520,12 +543,13 @@ impl TraceAgent { ) .await { - Ok(()) => success_response("Successfully buffered traces to be aggregated."), - Err(err) => error_response( + return error_response( StatusCode::INTERNAL_SERVER_ERROR, format!("Error sending traces to the trace aggregator: {err}"), - ), + ); } + + success_response("Successfully buffered traces to be aggregated.") } #[allow(clippy::too_many_arguments)] diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs new file mode 100644 index 000000000..9d637253d --- /dev/null +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -0,0 +1,34 @@ +use tokio::sync::mpsc::error::SendError; + +use tracing::debug; + +use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; +use crate::traces::stats_concentrator_service::{ConcentratorCommand, StatsConcentratorHandle}; +use datadog_trace_protobuf::pb; + +pub struct SendingTraceStatsProcessor { + stats_concentrator: StatsConcentratorHandle, +} + +// Extracts information from traces related to stats and sends it to the stats concentrator +impl SendingTraceStatsProcessor { + #[must_use] + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + Self { stats_concentrator } + } + + pub fn send(&self, traces: &[Vec]) -> Result<(), SendError> { + debug!("Sending trace stats to the concentrator"); + for trace in traces { + for span in trace { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey {}, + stats: Stats {}, + }; + self.stats_concentrator.add(stats)?; + } + } + Ok(()) + } +}