From f2848b3dc46f7e3340bc657f7e08688dec5e19b7 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Tue, 19 Aug 2025 15:02:07 -0400 Subject: [PATCH 1/9] feat: [Trace Stats] Add skeleton of concentrator --- bottlecap/src/bin/bottlecap/main.rs | 20 ++++- .../src/lifecycle/invocation/processor.rs | 16 ++-- bottlecap/src/traces/mod.rs | 4 + bottlecap/src/traces/stats_agent.rs | 46 ++++++++++ bottlecap/src/traces/stats_aggregator.rs | 37 +++++--- bottlecap/src/traces/stats_concentrator.rs | 37 ++++++++ .../src/traces/stats_concentrator_service.rs | 86 +++++++++++++++++++ bottlecap/src/traces/stats_flusher.rs | 9 +- bottlecap/src/traces/trace_agent.rs | 44 ++++++++-- bottlecap/src/traces/trace_stats_processor.rs | 35 ++++++++ 10 files changed, 301 insertions(+), 33 deletions(-) create mode 100644 bottlecap/src/traces/stats_agent.rs create mode 100644 bottlecap/src/traces/stats_concentrator.rs create mode 100644 bottlecap/src/traces/stats_concentrator_service.rs create mode 100644 bottlecap/src/traces/trace_stats_processor.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 4b01eba54..bb3a4fa85 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -57,6 +57,7 @@ use bottlecap::{ proxy_aggregator, proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, + stats_concentrator_service::StatsConcentratorService, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_aggregator::{self, SendDataBuilderInfo}, @@ -522,6 +523,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; } @@ -537,6 +539,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle.clone(), + false, ) .await; let next_response = @@ -606,6 +609,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; } @@ -639,6 +643,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + false, // force_flush_trace_stats ) .await; } @@ -697,6 +702,7 @@ async fn extension_loop_active( &proxy_flusher, &mut race_flush_interval, &metrics_aggr_handle, + true, // force_flush_trace_stats ) .await; return Ok(()); @@ -704,6 +710,7 @@ async fn extension_loop_active( } } +#[allow(clippy::too_many_arguments)] async fn blocking_flush_all( logs_flusher: &LogsFlusher, metrics_flushers: &mut [MetricsFlusher], @@ -712,6 +719,7 @@ async fn blocking_flush_all( proxy_flusher: &ProxyFlusher, race_flush_interval: &mut tokio::time::Interval, metrics_aggr_handle: &MetricsAggregatorHandle, + force_flush_trace_stats: bool, ) { let flush_response = metrics_aggr_handle .flush() @@ -731,7 +739,7 @@ async fn blocking_flush_all( logs_flusher.flush(None), futures::future::join_all(metrics_futures), trace_flusher.flush(None), - stats_flusher.flush(), + stats_flusher.flush(force_flush_trace_stats), proxy_flusher.flush(None), ); race_flush_interval.reset(); @@ -981,7 +989,12 @@ fn start_trace_agent( tokio_util::sync::CancellationToken, ) { // Stats - let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default())); + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(config)); + tokio::spawn(stats_concentrator_service.run()); + let stats_aggregator: Arc> = Arc::new(TokioMutex::new( + StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), + )); let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), @@ -1023,12 +1036,13 @@ fn start_trace_agent( Arc::clone(config), trace_aggregator, trace_processor.clone(), - stats_aggregator, + stats_aggregator.clone(), stats_processor, proxy_aggregator, invocation_processor, appsec_processor, Arc::clone(tags_provider), + stats_concentrator_handle.clone(), ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 28d9fd37f..1784a224a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -45,6 +45,7 @@ use crate::lifecycle::invocation::triggers::get_default_service_name; pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const S_TO_NS_U64: u64 = 1_000_000_000; pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; @@ -128,12 +129,10 @@ impl Processor { self.context_buffer .start_context(&request_id, invocation_span); - let timestamp = std::time::UNIX_EPOCH + let timestamp_secs = std::time::UNIX_EPOCH .elapsed() .expect("can't poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); + .as_secs(); if self.config.lambda_proc_enhanced_metrics { // Collect offsets for network and cpu metrics @@ -162,14 +161,19 @@ impl Processor { } // Increment the invocation metric - self.enhanced_metrics.increment_invocation_metric(timestamp); + self.enhanced_metrics + .increment_invocation_metric(timestamp_secs.try_into().unwrap_or_default()); self.enhanced_metrics.set_invoked_received(); // If `UniversalInstrumentationStart` event happened first, process it if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) { // Infer span self.inferrer.infer_span(&payload_value, &self.aws_config); - self.process_on_universal_instrumentation_start(request_id, headers, payload_value); + self.process_on_universal_instrumentation_start( + request_id.clone(), + headers, + payload_value, + ); } } diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index f3363d67d..420b06207 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -6,13 +6,17 @@ pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; pub mod span_pointers; +pub mod stats_agent; pub mod stats_aggregator; +pub mod stats_concentrator; +pub mod stats_concentrator_service; pub mod stats_flusher; pub mod stats_processor; pub mod trace_agent; pub mod trace_aggregator; pub mod trace_flusher; pub mod trace_processor; +pub mod trace_stats_processor; // URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set. const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001"; diff --git a/bottlecap/src/traces/stats_agent.rs b/bottlecap/src/traces/stats_agent.rs new file mode 100644 index 000000000..fd0f513e4 --- /dev/null +++ b/bottlecap/src/traces/stats_agent.rs @@ -0,0 +1,46 @@ +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::error; + +use super::stats_concentrator_service::StatsConcentratorHandle; + +use super::stats_concentrator::AggregationKey; +use super::stats_concentrator::Stats; + +#[derive(Clone, Copy)] +pub struct StatsEvent { + pub time: u64, + pub aggregation_key: AggregationKey, + pub stats: Stats, +} + +#[allow(clippy::module_name_repetitions)] +pub struct StatsAgent { + tx: Sender, + rx: Receiver, + concentrator: StatsConcentratorHandle, +} + +impl StatsAgent { + #[must_use] + pub fn new(concentrator: StatsConcentratorHandle) -> StatsAgent { + let (tx, rx) = mpsc::channel::(1000); + StatsAgent { + tx, + rx, + concentrator, + } + } + + pub async fn spin(&mut self) { + while let Some(event) = self.rx.recv().await { + if let Err(e) = self.concentrator.add(event) { + error!("Error adding stats event to the stats concentrator: {e}"); + } + } + } + + #[must_use] + pub fn get_sender_copy(&self) -> Sender { + self.tx.clone() + } +} diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index fa23c0b63..913d981cd 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -1,5 +1,7 @@ +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_protobuf::pb::ClientStatsPayload; use std::collections::VecDeque; +use tracing::error; #[allow(clippy::empty_line_after_doc_comments)] /// Maximum number of entries in a stat payload. @@ -22,37 +24,44 @@ pub struct StatsAggregator { queue: VecDeque, max_content_size_bytes: usize, buffer: Vec, -} - -impl Default for StatsAggregator { - fn default() -> Self { - StatsAggregator { - queue: VecDeque::new(), - max_content_size_bytes: MAX_CONTENT_SIZE_BYTES, - buffer: Vec::new(), - } - } + concentrator: StatsConcentratorHandle, } /// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog. impl StatsAggregator { #[allow(dead_code)] #[allow(clippy::must_use_candidate)] - pub fn new(max_content_size_bytes: usize) -> Self { + fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self { StatsAggregator { queue: VecDeque::new(), max_content_size_bytes, buffer: Vec::new(), + concentrator, } } + #[must_use] + pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self { + Self::new(MAX_CONTENT_SIZE_BYTES, concentrator) + } + /// Takes in an individual trace stats payload. pub fn add(&mut self, payload: ClientStatsPayload) { self.queue.push_back(payload); } /// Returns a batch of trace stats payloads, subject to the max content size. - pub fn get_batch(&mut self) -> Vec { + pub async fn get_batch(&mut self, force_flush: bool) -> Vec { + // Pull stats data from concentrator + match self.concentrator.get_stats(force_flush).await { + Ok(stats) => { + self.queue.extend(stats); + } + Err(e) => { + error!("Error getting stats from the stats concentrator: {e:?}"); + } + } + let mut batch_size = 0; // Fill the batch @@ -158,12 +167,12 @@ mod tests { aggregator.add(payload.clone()); // The batch should only contain the first 2 payloads - let first_batch = aggregator.get_batch(); + let first_batch = aggregator.get_batch(false); assert_eq!(first_batch, vec![payload.clone(), payload.clone()]); assert_eq!(aggregator.queue.len(), 1); // The second batch should only contain the last log - let second_batch = aggregator.get_batch(); + let second_batch = aggregator.get_batch(false); assert_eq!(second_batch, vec![payload]); assert_eq!(aggregator.queue.len(), 0); } diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs new file mode 100644 index 000000000..95b134497 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -0,0 +1,37 @@ +use crate::config::Config; +use crate::traces::stats_agent::StatsEvent; +use datadog_trace_protobuf::pb; +use std::sync::Arc; + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] +pub struct AggregationKey { +} + + +#[derive(Clone, Debug, Default, Copy)] +pub struct Stats { +} + +pub struct StatsConcentrator { + _config: Arc, +} + +// Aggregates stats into buckets, which are then pulled by the stats aggregator. +impl StatsConcentrator { + #[must_use] + pub fn new(config: Arc) -> Self { + Self { + _config: config, + } + } + + pub fn add(&mut self, _stats_event: StatsEvent) { + } + + // force_flush: If true, flush all stats. If false, flush stats except for the few latest + // buckets, which may still be getting data. + #[must_use] + pub fn get_stats(&mut self, _force_flush: bool) -> Vec { + vec![] + } +} \ No newline at end of file diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs new file mode 100644 index 000000000..b3f745219 --- /dev/null +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -0,0 +1,86 @@ +use tokio::sync::{mpsc, oneshot}; + +use super::stats_agent::StatsEvent; +use super::stats_concentrator::StatsConcentrator; +use crate::config::Config; +use datadog_trace_protobuf::pb; +use std::sync::Arc; +use tracing::error; + +#[derive(Debug)] +pub enum StatsError { + SendError(mpsc::error::SendError), + RecvError(oneshot::error::RecvError), +} + +impl From> for StatsError { + fn from(err: mpsc::error::SendError) -> Self { + StatsError::SendError(err) + } +} + +impl From for StatsError { + fn from(err: oneshot::error::RecvError) -> Self { + StatsError::RecvError(err) + } +} + +pub enum ConcentratorCommand { + Add(StatsEvent), + GetStats(bool, oneshot::Sender>), +} + +#[derive(Clone)] +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, +} + +impl StatsConcentratorHandle { + pub fn add( + &self, + stats_event: StatsEvent, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(ConcentratorCommand::Add(stats_event)) + } + + pub async fn get_stats( + &self, + force_flush: bool, + ) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::GetStats(force_flush, response_tx))?; + let stats = response_rx.await?; + Ok(stats) + } +} + +pub struct StatsConcentratorService { + concentrator: StatsConcentrator, + rx: mpsc::UnboundedReceiver, +} + +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle { tx }; + let concentrator = StatsConcentrator::new(config); + let service: StatsConcentratorService = Self { concentrator, rx }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event), + ConcentratorCommand::GetStats(force_flush, response_tx) => { + let stats = self.concentrator.get_stats(force_flush); + if let Err(e) = response_tx.send(stats) { + error!("Failed to return trace stats: {e:?}"); + } + } + } + } + } +} diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 71a86fe9d..95ba905bb 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -28,7 +28,7 @@ pub trait StatsFlusher { /// Flushes stats to the Datadog trace stats intake. async fn send(&self, traces: Vec); - async fn flush(&self); + async fn flush(&self, force_flush: bool); } #[allow(clippy::module_name_repetitions)] @@ -116,14 +116,15 @@ impl StatsFlusher for ServerlessStatsFlusher { } }; } - async fn flush(&self) { + + async fn flush(&self, force_flush: bool) { let mut guard = self.aggregator.lock().await; - let mut stats = guard.get_batch(); + let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { self.send(stats).await; - stats = guard.get_batch(); + stats = guard.get_batch(force_flush).await; } } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index b920c2513..5acd21d0c 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -41,6 +41,10 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; +use crate::traces::stats_agent::StatsAgent; +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; +use crate::traces::trace_stats_processor::SendingTraceStatsProcessor; + const TRACE_AGENT_PORT: usize = 8126; // Agent endpoints @@ -75,6 +79,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_sender: Arc, + pub stats_sender: Arc, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -102,6 +107,7 @@ pub struct TraceAgent { appsec_processor: Option>>, shutdown_token: CancellationToken, tx: Sender, + stats_agent: Arc>, } #[derive(Clone, Copy)] @@ -123,6 +129,7 @@ impl TraceAgent { invocation_processor: Arc>, appsec_processor: Option>>, tags_provider: Arc, + stats_concentrator: StatsConcentratorHandle, ) -> TraceAgent { // Set up a channel to send processed traces to our trace aggregator. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized @@ -138,6 +145,8 @@ impl TraceAgent { } }); + let stats_agent = StatsAgent::new(stats_concentrator.clone()); + TraceAgent { config: config.clone(), trace_processor, @@ -149,9 +158,11 @@ impl TraceAgent { tags_provider, tx: trace_tx, shutdown_token: CancellationToken::new(), + stats_agent: Arc::new(Mutex::new(stats_agent)), } } + #[allow(clippy::cast_possible_truncation)] pub async fn start(&self) -> Result<(), Box> { let now = Instant::now(); @@ -170,7 +181,13 @@ impl TraceAgent { } }); - let router = self.make_router(stats_tx); + // Start the stats agent, which receives stats events and sends them to the stats concentrator + let stats_agent = self.stats_agent.clone(); + tokio::spawn(async move { + stats_agent.lock().await.spin().await; + }); + + let router = self.make_router(stats_tx).await; let port = u16::try_from(TRACE_AGENT_PORT).expect("TRACE_AGENT_PORT is too large"); let socket = SocketAddr::from(([127, 0, 0, 1], port)); @@ -190,7 +207,8 @@ impl TraceAgent { Ok(()) } - fn make_router(&self, stats_tx: Sender) -> Router { + async fn make_router(&self, stats_tx: Sender) -> Router { + let stats_agent_tx = self.stats_agent.lock().await.get_sender_copy(); let trace_state = TraceState { config: Arc::clone(&self.config), trace_sender: Arc::new(SendingTraceProcessor { @@ -198,6 +216,7 @@ impl TraceAgent { processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), }), + stats_sender: Arc::new(SendingTraceStatsProcessor::new(stats_agent_tx)), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -266,6 +285,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V04, @@ -278,6 +298,7 @@ impl TraceAgent { state.config, request, state.trace_sender, + state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V05, @@ -417,6 +438,7 @@ impl TraceAgent { config: Arc, request: Request, trace_sender: Arc, + stats_sender: Arc, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, @@ -509,7 +531,16 @@ impl TraceAgent { } } - match trace_sender + if config.compute_trace_stats { + if let Err(err) = stats_sender.send(&traces).await { + return error_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error sending stats to the stats aggregator: {err}"), + ); + } + } + + if let Err(err) = trace_sender .send_processed_traces( config, tags_provider, @@ -520,12 +551,13 @@ impl TraceAgent { ) .await { - Ok(()) => success_response("Successfully buffered traces to be aggregated."), - Err(err) => error_response( + return error_response( StatusCode::INTERNAL_SERVER_ERROR, format!("Error sending traces to the trace aggregator: {err}"), - ), + ); } + + success_response("Successfully buffered traces to be aggregated.") } #[allow(clippy::too_many_arguments)] diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs new file mode 100644 index 000000000..d1db5e038 --- /dev/null +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -0,0 +1,35 @@ +use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::error::SendError; +use tracing::debug; + +use super::stats_agent::StatsEvent; +use super::stats_concentrator::AggregationKey; +use super::stats_concentrator::Stats; + +use datadog_trace_protobuf::pb; + +pub struct SendingTraceStatsProcessor { + stats_tx: Sender, +} + +impl SendingTraceStatsProcessor { + #[must_use] + pub fn new(stats_tx: Sender) -> Self { + Self { stats_tx } + } + + pub async fn send(&self, traces: &[Vec]) -> Result<(), SendError> { + debug!("Sending trace stats to the concentrator"); + for trace in traces { + for span in trace { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey {}, + stats: Stats {}, + }; + self.stats_tx.send(stats).await?; + } + } + Ok(()) + } +} From 426bbb2d3cfe49e250921ea8ad36fe0f86e3c054 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 12:22:46 -0400 Subject: [PATCH 2/9] Fix tests --- bottlecap/src/traces/stats_aggregator.rs | 29 ++++++++++++++++-------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 913d981cd..9b7c31341 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -89,10 +89,15 @@ impl StatsAggregator { #[allow(clippy::unwrap_used)] mod tests { use super::*; + use crate::config::Config; + use std::sync::Arc; + use crate::traces::stats_concentrator_service::StatsConcentratorService; #[test] fn test_add() { - let mut aggregator = StatsAggregator::default(); + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new_with_concentrator(concentrator); let payload = ClientStatsPayload { hostname: "hostname".to_string(), env: "dev".to_string(), @@ -115,9 +120,11 @@ mod tests { assert_eq!(aggregator.queue[0], payload); } - #[test] - fn test_get_batch() { - let mut aggregator = StatsAggregator::default(); + #[tokio::test] + async fn test_get_batch() { + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new_with_concentrator(concentrator); let payload = ClientStatsPayload { hostname: "hostname".to_string(), env: "dev".to_string(), @@ -136,13 +143,15 @@ mod tests { }; aggregator.add(payload.clone()); assert_eq!(aggregator.queue.len(), 1); - let batch = aggregator.get_batch(); + let batch = aggregator.get_batch(false).await; assert_eq!(batch, vec![payload]); } - #[test] - fn test_get_batch_full_entries() { - let mut aggregator = StatsAggregator::new(640); + #[tokio::test] + async fn test_get_batch_full_entries() { + let config = Arc::new(Config::default()); + let (_, concentrator) = StatsConcentratorService::new(config); + let mut aggregator = StatsAggregator::new(640, concentrator); // Payload below is 115 bytes let payload = ClientStatsPayload { hostname: "hostname".to_string(), @@ -167,12 +176,12 @@ mod tests { aggregator.add(payload.clone()); // The batch should only contain the first 2 payloads - let first_batch = aggregator.get_batch(false); + let first_batch = aggregator.get_batch(false).await; assert_eq!(first_batch, vec![payload.clone(), payload.clone()]); assert_eq!(aggregator.queue.len(), 1); // The second batch should only contain the last log - let second_batch = aggregator.get_batch(false); + let second_batch = aggregator.get_batch(false).await; assert_eq!(second_batch, vec![payload]); assert_eq!(aggregator.queue.len(), 0); } From 59247e82058d9860cf1c926e2b1d87b256d1b3bc Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 12:23:37 -0400 Subject: [PATCH 3/9] fmt --- bottlecap/src/traces/stats_aggregator.rs | 2 +- bottlecap/src/traces/stats_concentrator.rs | 16 +++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 9b7c31341..1a91b7d47 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -90,8 +90,8 @@ impl StatsAggregator { mod tests { use super::*; use crate::config::Config; - use std::sync::Arc; use crate::traces::stats_concentrator_service::StatsConcentratorService; + use std::sync::Arc; #[test] fn test_add() { diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 95b134497..1dc63ac47 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -4,13 +4,10 @@ use datadog_trace_protobuf::pb; use std::sync::Arc; #[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] -pub struct AggregationKey { -} - +pub struct AggregationKey {} #[derive(Clone, Debug, Default, Copy)] -pub struct Stats { -} +pub struct Stats {} pub struct StatsConcentrator { _config: Arc, @@ -20,13 +17,10 @@ pub struct StatsConcentrator { impl StatsConcentrator { #[must_use] pub fn new(config: Arc) -> Self { - Self { - _config: config, - } + Self { _config: config } } - pub fn add(&mut self, _stats_event: StatsEvent) { - } + pub fn add(&mut self, _stats_event: StatsEvent) {} // force_flush: If true, flush all stats. If false, flush stats except for the few latest // buckets, which may still be getting data. @@ -34,4 +28,4 @@ impl StatsConcentrator { pub fn get_stats(&mut self, _force_flush: bool) -> Vec { vec![] } -} \ No newline at end of file +} From a5226092420133609558f6506ade9e5d64602b2c Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 13:41:59 -0400 Subject: [PATCH 4/9] Revert unnecessary changes --- bottlecap/src/lifecycle/invocation/processor.rs | 16 ++++++---------- bottlecap/src/traces/stats_agent.rs | 5 +---- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 1784a224a..28d9fd37f 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -45,7 +45,6 @@ use crate::lifecycle::invocation::triggers::get_default_service_name; pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_MS: u64 = 1_000; pub const S_TO_NS: f64 = 1_000_000_000.0; -pub const S_TO_NS_U64: u64 = 1_000_000_000; pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; @@ -129,10 +128,12 @@ impl Processor { self.context_buffer .start_context(&request_id, invocation_span); - let timestamp_secs = std::time::UNIX_EPOCH + let timestamp = std::time::UNIX_EPOCH .elapsed() .expect("can't poll clock, unrecoverable") - .as_secs(); + .as_secs() + .try_into() + .unwrap_or_default(); if self.config.lambda_proc_enhanced_metrics { // Collect offsets for network and cpu metrics @@ -161,19 +162,14 @@ impl Processor { } // Increment the invocation metric - self.enhanced_metrics - .increment_invocation_metric(timestamp_secs.try_into().unwrap_or_default()); + self.enhanced_metrics.increment_invocation_metric(timestamp); self.enhanced_metrics.set_invoked_received(); // If `UniversalInstrumentationStart` event happened first, process it if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) { // Infer span self.inferrer.infer_span(&payload_value, &self.aws_config); - self.process_on_universal_instrumentation_start( - request_id.clone(), - headers, - payload_value, - ); + self.process_on_universal_instrumentation_start(request_id, headers, payload_value); } } diff --git a/bottlecap/src/traces/stats_agent.rs b/bottlecap/src/traces/stats_agent.rs index fd0f513e4..818612af0 100644 --- a/bottlecap/src/traces/stats_agent.rs +++ b/bottlecap/src/traces/stats_agent.rs @@ -1,10 +1,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::error; -use super::stats_concentrator_service::StatsConcentratorHandle; - -use super::stats_concentrator::AggregationKey; -use super::stats_concentrator::Stats; +use crate::traces::{stats_concentrator_service::StatsConcentratorHandle, stats_concentrator::{AggregationKey, Stats}}; #[derive(Clone, Copy)] pub struct StatsEvent { From da8134cd1e55dcf9f05eb1a465cbfa84be195a2a Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 13:50:38 -0400 Subject: [PATCH 5/9] Remove stats agent --- bottlecap/src/traces/mod.rs | 1 - bottlecap/src/traces/stats_agent.rs | 43 ------------------- bottlecap/src/traces/stats_concentrator.rs | 8 +++- .../src/traces/stats_concentrator_service.rs | 4 +- bottlecap/src/traces/trace_agent.rs | 22 +++------- bottlecap/src/traces/trace_stats_processor.rs | 18 ++++---- 6 files changed, 23 insertions(+), 73 deletions(-) delete mode 100644 bottlecap/src/traces/stats_agent.rs diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 420b06207..8c883b796 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -6,7 +6,6 @@ pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; pub mod span_pointers; -pub mod stats_agent; pub mod stats_aggregator; pub mod stats_concentrator; pub mod stats_concentrator_service; diff --git a/bottlecap/src/traces/stats_agent.rs b/bottlecap/src/traces/stats_agent.rs deleted file mode 100644 index 818612af0..000000000 --- a/bottlecap/src/traces/stats_agent.rs +++ /dev/null @@ -1,43 +0,0 @@ -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::error; - -use crate::traces::{stats_concentrator_service::StatsConcentratorHandle, stats_concentrator::{AggregationKey, Stats}}; - -#[derive(Clone, Copy)] -pub struct StatsEvent { - pub time: u64, - pub aggregation_key: AggregationKey, - pub stats: Stats, -} - -#[allow(clippy::module_name_repetitions)] -pub struct StatsAgent { - tx: Sender, - rx: Receiver, - concentrator: StatsConcentratorHandle, -} - -impl StatsAgent { - #[must_use] - pub fn new(concentrator: StatsConcentratorHandle) -> StatsAgent { - let (tx, rx) = mpsc::channel::(1000); - StatsAgent { - tx, - rx, - concentrator, - } - } - - pub async fn spin(&mut self) { - while let Some(event) = self.rx.recv().await { - if let Err(e) = self.concentrator.add(event) { - error!("Error adding stats event to the stats concentrator: {e}"); - } - } - } - - #[must_use] - pub fn get_sender_copy(&self) -> Sender { - self.tx.clone() - } -} diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 1dc63ac47..6bf99f968 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -1,8 +1,14 @@ use crate::config::Config; -use crate::traces::stats_agent::StatsEvent; use datadog_trace_protobuf::pb; use std::sync::Arc; +#[derive(Clone, Copy)] +pub struct StatsEvent { + pub time: u64, + pub aggregation_key: AggregationKey, + pub stats: Stats, +} + #[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] pub struct AggregationKey {} diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index b3f745219..e5afc37e3 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -1,7 +1,7 @@ use tokio::sync::{mpsc, oneshot}; -use super::stats_agent::StatsEvent; -use super::stats_concentrator::StatsConcentrator; +use crate::traces::stats_concentrator::StatsEvent; +use crate::traces::stats_concentrator::StatsConcentrator; use crate::config::Config; use datadog_trace_protobuf::pb; use std::sync::Arc; diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 5acd21d0c..20cba4dd3 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -41,7 +41,6 @@ use datadog_trace_protobuf::pb; use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; -use crate::traces::stats_agent::StatsAgent; use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use crate::traces::trace_stats_processor::SendingTraceStatsProcessor; @@ -107,7 +106,7 @@ pub struct TraceAgent { appsec_processor: Option>>, shutdown_token: CancellationToken, tx: Sender, - stats_agent: Arc>, + stats_concentrator: StatsConcentratorHandle, } #[derive(Clone, Copy)] @@ -145,8 +144,6 @@ impl TraceAgent { } }); - let stats_agent = StatsAgent::new(stats_concentrator.clone()); - TraceAgent { config: config.clone(), trace_processor, @@ -158,7 +155,7 @@ impl TraceAgent { tags_provider, tx: trace_tx, shutdown_token: CancellationToken::new(), - stats_agent: Arc::new(Mutex::new(stats_agent)), + stats_concentrator, } } @@ -181,13 +178,7 @@ impl TraceAgent { } }); - // Start the stats agent, which receives stats events and sends them to the stats concentrator - let stats_agent = self.stats_agent.clone(); - tokio::spawn(async move { - stats_agent.lock().await.spin().await; - }); - - let router = self.make_router(stats_tx).await; + let router = self.make_router(stats_tx); let port = u16::try_from(TRACE_AGENT_PORT).expect("TRACE_AGENT_PORT is too large"); let socket = SocketAddr::from(([127, 0, 0, 1], port)); @@ -207,8 +198,7 @@ impl TraceAgent { Ok(()) } - async fn make_router(&self, stats_tx: Sender) -> Router { - let stats_agent_tx = self.stats_agent.lock().await.get_sender_copy(); + fn make_router(&self, stats_tx: Sender) -> Router { let trace_state = TraceState { config: Arc::clone(&self.config), trace_sender: Arc::new(SendingTraceProcessor { @@ -216,7 +206,7 @@ impl TraceAgent { processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), }), - stats_sender: Arc::new(SendingTraceStatsProcessor::new(stats_agent_tx)), + stats_sender: Arc::new(SendingTraceStatsProcessor::new(self.stats_concentrator.clone())), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -532,7 +522,7 @@ impl TraceAgent { } if config.compute_trace_stats { - if let Err(err) = stats_sender.send(&traces).await { + if let Err(err) = stats_sender.send(&traces) { return error_response( StatusCode::INTERNAL_SERVER_ERROR, format!("Error sending stats to the stats aggregator: {err}"), diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index d1db5e038..55b397921 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -1,24 +1,22 @@ -use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; -use tracing::debug; -use super::stats_agent::StatsEvent; -use super::stats_concentrator::AggregationKey; -use super::stats_concentrator::Stats; +use tracing::debug; +use crate::traces::stats_concentrator::{StatsEvent, AggregationKey, Stats}; +use crate::traces::stats_concentrator_service::{ConcentratorCommand, StatsConcentratorHandle}; use datadog_trace_protobuf::pb; pub struct SendingTraceStatsProcessor { - stats_tx: Sender, + stats_concentrator: StatsConcentratorHandle, } impl SendingTraceStatsProcessor { #[must_use] - pub fn new(stats_tx: Sender) -> Self { - Self { stats_tx } + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + Self { stats_concentrator } } - pub async fn send(&self, traces: &[Vec]) -> Result<(), SendError> { + pub fn send(&self, traces: &[Vec]) -> Result<(), SendError> { debug!("Sending trace stats to the concentrator"); for trace in traces { for span in trace { @@ -27,7 +25,7 @@ impl SendingTraceStatsProcessor { aggregation_key: AggregationKey {}, stats: Stats {}, }; - self.stats_tx.send(stats).await?; + self.stats_concentrator.add(stats)?; } } Ok(()) From 953af7ec4a6f62767c5f0677a594101bc9a4b819 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 13:50:49 -0400 Subject: [PATCH 6/9] fmt --- bottlecap/src/traces/stats_concentrator_service.rs | 4 ++-- bottlecap/src/traces/trace_agent.rs | 4 +++- bottlecap/src/traces/trace_stats_processor.rs | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index e5afc37e3..ac44b0e4e 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -1,8 +1,8 @@ use tokio::sync::{mpsc, oneshot}; -use crate::traces::stats_concentrator::StatsEvent; -use crate::traces::stats_concentrator::StatsConcentrator; use crate::config::Config; +use crate::traces::stats_concentrator::StatsConcentrator; +use crate::traces::stats_concentrator::StatsEvent; use datadog_trace_protobuf::pb; use std::sync::Arc; use tracing::error; diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 20cba4dd3..9145a2a2f 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -206,7 +206,9 @@ impl TraceAgent { processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), }), - stats_sender: Arc::new(SendingTraceStatsProcessor::new(self.stats_concentrator.clone())), + stats_sender: Arc::new(SendingTraceStatsProcessor::new( + self.stats_concentrator.clone(), + )), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index 55b397921..4a36cd022 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -2,7 +2,7 @@ use tokio::sync::mpsc::error::SendError; use tracing::debug; -use crate::traces::stats_concentrator::{StatsEvent, AggregationKey, Stats}; +use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; use crate::traces::stats_concentrator_service::{ConcentratorCommand, StatsConcentratorHandle}; use datadog_trace_protobuf::pb; From 69f51c9ffbb0e81f85b2ec178a8dc14aa53712f3 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 17 Sep 2025 14:30:45 -0400 Subject: [PATCH 7/9] Add comments --- bottlecap/src/bin/bottlecap/main.rs | 4 ++-- bottlecap/src/traces/stats_concentrator.rs | 1 + bottlecap/src/traces/stats_concentrator_service.rs | 2 ++ bottlecap/src/traces/trace_stats_processor.rs | 1 + 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index bb3a4fa85..2c522df3d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1036,13 +1036,13 @@ fn start_trace_agent( Arc::clone(config), trace_aggregator, trace_processor.clone(), - stats_aggregator.clone(), + stats_aggregator, stats_processor, proxy_aggregator, invocation_processor, appsec_processor, Arc::clone(tags_provider), - stats_concentrator_handle.clone(), + stats_concentrator_handle, ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index 6bf99f968..f62d70636 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -2,6 +2,7 @@ use crate::config::Config; use datadog_trace_protobuf::pb; use std::sync::Arc; +// Event sent to the stats concentrator #[derive(Clone, Copy)] pub struct StatsEvent { pub time: u64, diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index ac44b0e4e..575ebdc88 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -60,6 +60,8 @@ pub struct StatsConcentratorService { rx: mpsc::UnboundedReceiver, } +// A service that handles add() and get_stats() requests in the same queue, +// to avoid using mutex, which may cause lock contention. impl StatsConcentratorService { #[must_use] pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index 4a36cd022..9d637253d 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -10,6 +10,7 @@ pub struct SendingTraceStatsProcessor { stats_concentrator: StatsConcentratorHandle, } +// Extracts information from traces related to stats and sends it to the stats concentrator impl SendingTraceStatsProcessor { #[must_use] pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { From 0dbf697b696b5379793df0f01776a867798a7d74 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 12:51:22 -0400 Subject: [PATCH 8/9] Rename: get_stats() -> flush() --- bottlecap/src/traces/stats_aggregator.rs | 2 +- bottlecap/src/traces/stats_concentrator.rs | 2 +- bottlecap/src/traces/stats_concentrator_service.rs | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 1a91b7d47..36074c6d3 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -53,7 +53,7 @@ impl StatsAggregator { /// Returns a batch of trace stats payloads, subject to the max content size. pub async fn get_batch(&mut self, force_flush: bool) -> Vec { // Pull stats data from concentrator - match self.concentrator.get_stats(force_flush).await { + match self.concentrator.flush(force_flush).await { Ok(stats) => { self.queue.extend(stats); } diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs index f62d70636..511e3fbc6 100644 --- a/bottlecap/src/traces/stats_concentrator.rs +++ b/bottlecap/src/traces/stats_concentrator.rs @@ -32,7 +32,7 @@ impl StatsConcentrator { // force_flush: If true, flush all stats. If false, flush stats except for the few latest // buckets, which may still be getting data. #[must_use] - pub fn get_stats(&mut self, _force_flush: bool) -> Vec { + pub fn flush(&mut self, _force_flush: bool) -> Vec { vec![] } } diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index 575ebdc88..0fd4e4ff7 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -27,7 +27,7 @@ impl From for StatsError { pub enum ConcentratorCommand { Add(StatsEvent), - GetStats(bool, oneshot::Sender>), + Flush(bool, oneshot::Sender>), } #[derive(Clone)] @@ -43,13 +43,13 @@ impl StatsConcentratorHandle { self.tx.send(ConcentratorCommand::Add(stats_event)) } - pub async fn get_stats( + pub async fn flush( &self, force_flush: bool, ) -> Result, StatsError> { let (response_tx, response_rx) = oneshot::channel(); self.tx - .send(ConcentratorCommand::GetStats(force_flush, response_tx))?; + .send(ConcentratorCommand::Flush(force_flush, response_tx))?; let stats = response_rx.await?; Ok(stats) } @@ -60,7 +60,7 @@ pub struct StatsConcentratorService { rx: mpsc::UnboundedReceiver, } -// A service that handles add() and get_stats() requests in the same queue, +// A service that handles add() and flush() requests in the same queue, // to avoid using mutex, which may cause lock contention. impl StatsConcentratorService { #[must_use] @@ -76,8 +76,8 @@ impl StatsConcentratorService { while let Some(command) = self.rx.recv().await { match command { ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event), - ConcentratorCommand::GetStats(force_flush, response_tx) => { - let stats = self.concentrator.get_stats(force_flush); + ConcentratorCommand::Flush(force_flush, response_tx) => { + let stats = self.concentrator.flush(force_flush); if let Err(e) = response_tx.send(stats) { error!("Failed to return trace stats: {e:?}"); } From 46efff50c6a3784e43c3e39661b16c867bbd77f7 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 13:36:52 -0400 Subject: [PATCH 9/9] Use thiserror --- .../src/traces/stats_concentrator_service.rs | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index 0fd4e4ff7..c740b9db1 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -7,24 +7,14 @@ use datadog_trace_protobuf::pb; use std::sync::Arc; use tracing::error; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] SendError(mpsc::error::SendError), + #[error("Failed to receive response from concentrator: {0}")] RecvError(oneshot::error::RecvError), } -impl From> for StatsError { - fn from(err: mpsc::error::SendError) -> Self { - StatsError::SendError(err) - } -} - -impl From for StatsError { - fn from(err: oneshot::error::RecvError) -> Self { - StatsError::RecvError(err) - } -} - pub enum ConcentratorCommand { Add(StatsEvent), Flush(bool, oneshot::Sender>), @@ -49,8 +39,9 @@ impl StatsConcentratorHandle { ) -> Result, StatsError> { let (response_tx, response_rx) = oneshot::channel(); self.tx - .send(ConcentratorCommand::Flush(force_flush, response_tx))?; - let stats = response_rx.await?; + .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .map_err(StatsError::SendError)?; + let stats = response_rx.await.map_err(StatsError::RecvError)?; Ok(stats) } }