From 64b4a9535dabea55ddd61806d5a8b53e585e92e2 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 11:39:09 -0400 Subject: [PATCH 01/11] Try to move stats sending after obfuscation (incomplete) --- bottlecap/src/bin/bottlecap/main.rs | 2 ++ bottlecap/src/traces/trace_agent.rs | 29 ++++++----------- bottlecap/src/traces/trace_processor.rs | 41 +++++++++++++++++++++---- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 2c522df3d..53e1e3b8e 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -63,6 +63,7 @@ use bottlecap::{ trace_aggregator::{self, SendDataBuilderInfo}, trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, trace_processor::{self, SendingTraceProcessor}, + trace_stats_processor::SendingTraceStatsProcessor, }, }; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; @@ -803,6 +804,7 @@ async fn handle_event_bus_event( appsec: appsec_processor.clone(), processor: trace_processor.clone(), trace_tx: trace_agent_channel.clone(), + stats_sender: Arc::new(SendingTraceStatsProcessor::new(stats_agent_channel.clone())), }), event.time.timestamp(), ) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 9145a2a2f..c783dd5a1 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -522,26 +522,17 @@ impl TraceAgent { .await; } } - - if config.compute_trace_stats { - if let Err(err) = stats_sender.send(&traces) { - return error_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Error sending stats to the stats aggregator: {err}"), - ); - } - } - + if let Err(err) = trace_sender - .send_processed_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ) - .await + .send_processed_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ) + .await { return error_response( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 89e561dd1..9c6d417ad 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -31,7 +31,9 @@ use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; +use super::stats_concentrator_service::ConcentratorCommand; use super::trace_aggregator::SendDataBuilderInfo; +use super::trace_stats_processor::SendingTraceStatsProcessor; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] @@ -328,7 +330,7 @@ impl TraceProcessor for ServerlessTraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendDataBuilderInfo { + ) -> (SendDataBuilderInfo, Vec>) { let mut payload = trace_utils::collect_pb_trace_chunks( traces, &header_tags, @@ -369,7 +371,25 @@ impl TraceProcessor for ServerlessTraceProcessor { None, )); - SendDataBuilderInfo::new(builder, body_size) + (SendDataBuilderInfo::new(builder, body_size), traces) + } +} + +#[derive(Debug)] +pub enum SendingTraceProcessorError { + SendDataBuilderInfoError(SendError), + ConcentratorCommandError(SendError), +} + +impl From> for SendingTraceProcessorError { + fn from(err: SendError) -> Self { + SendingTraceProcessorError::ConcentratorCommandError(err) + } +} + +impl From> for SendingTraceProcessorError { + fn from(err: SendError) -> Self { + SendingTraceProcessorError::SendDataBuilderInfoError(err) } } @@ -390,6 +410,8 @@ pub struct SendingTraceProcessor { pub processor: Arc, /// The [`Sender`] to use for flushing the traces to the trace aggregator. pub trace_tx: Sender, + /// The [`SendingTraceStatsProcessor`] to use for sending stats to the stats concentrator. + pub stats_sender: Arc, } impl SendingTraceProcessor { /// Processes the provided traces, then flushes them to the trace aggregator @@ -402,7 +424,7 @@ impl SendingTraceProcessor { mut traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> Result<(), SendError> { + ) -> Result<(), SendingTraceProcessorError> { traces = if let Some(appsec) = &self.appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { @@ -415,7 +437,7 @@ impl SendingTraceProcessor { Some(trace) } else if let Some(ctx) = ctx{ debug!("TRACE_PROCESSOR | holding trace for App & API Protection additional data"); - ctx.hold_trace(trace, SendingTraceProcessor{ appsec: None, processor: self.processor.clone(), trace_tx: self.trace_tx.clone() }, HoldArguments{ + ctx.hold_trace(trace, SendingTraceProcessor{ appsec: None, processor: self.processor.clone(), trace_tx: self.trace_tx.clone(), stats_sender: self.stats_sender.clone() }, HoldArguments{ config:Arc::clone(&config), tags_provider:Arc::clone(&tags_provider), body_size, @@ -448,7 +470,7 @@ impl SendingTraceProcessor { let payload = self .processor .process_traces( - config, + config.clone(), tags_provider, header_tags, traces, @@ -456,7 +478,14 @@ impl SendingTraceProcessor { span_pointers, ) .await; - self.trace_tx.send(payload).await + self.trace_tx.send(payload).await?; + + // This needs to be after send_processed_traces() because send_processed_traces() + // performs obfuscation, and we need to compute stats on the obfuscated traces. + if config.compute_trace_stats { + self.stats_sender.send(&traces)?; + } + Ok(()) } } From 72d6999336d257c30d16b3b7740d43dd217530bf Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 13:35:04 -0400 Subject: [PATCH 02/11] Fix some errors --- bottlecap/src/appsec/processor/context.rs | 8 ++++++-- .../src/lifecycle/invocation/processor.rs | 9 +++++++-- bottlecap/src/otlp/agent.rs | 4 +++- bottlecap/src/traces/trace_agent.rs | 2 +- bottlecap/src/traces/trace_processor.rs | 19 +++++++++++++------ 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/bottlecap/src/appsec/processor/context.rs b/bottlecap/src/appsec/processor/context.rs index cc0fff886..7462f3240 100644 --- a/bottlecap/src/appsec/processor/context.rs +++ b/bottlecap/src/appsec/processor/context.rs @@ -135,8 +135,12 @@ impl Context { ) .await { - Ok(()) => debug!("aap: successfully sent trace to aggregator buffer"), - Err(e) => warn!("aap: failed to send trace to aggregator buffer: {e}"), + Ok(processed_traces) => { + for _trace in processed_traces { + // TODO: send trace stats + } + } + Err(e) => warn!("aap: failed to send trace to aggregator buffer: {e:?}"), } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 28d9fd37f..e39468e31 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -508,7 +508,7 @@ impl Processor { dropped_p0_spans: 0, }; - if let Err(e) = trace_sender + match trace_sender .send_processed_traces( self.config.clone(), tags_provider.clone(), @@ -519,7 +519,12 @@ impl Processor { ) .await { - debug!("Failed to send context spans to agent: {e}"); + Ok(processed_traces) => { + for _trace in processed_traces { + // TODO: send trace stats + } + } + Err(e) => debug!("Failed to send context spans to agent: {e:?}"), } } diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index aafa2fec6..5b7a5ce07 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -163,7 +163,7 @@ impl Agent { .into_response(); } - let send_data_builder = trace_processor + let (send_data_builder, _traces) = trace_processor .process_traces( config, tags_provider, @@ -191,6 +191,8 @@ impl Agent { ).into_response() } } + + // TODO: send trace stats } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index c783dd5a1..49d3e2996 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -536,7 +536,7 @@ impl TraceAgent { { return error_response( StatusCode::INTERNAL_SERVER_ERROR, - format!("Error sending traces to the trace aggregator: {err}"), + format!("Error sending traces to the trace aggregator: {err:?}"), ); } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 9c6d417ad..459931beb 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -30,6 +30,7 @@ use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; +use std::fmt::Display; use super::stats_concentrator_service::ConcentratorCommand; use super::trace_aggregator::SendDataBuilderInfo; @@ -317,7 +318,7 @@ pub trait TraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendDataBuilderInfo; + ) -> (SendDataBuilderInfo, Vec>); } #[async_trait] @@ -381,6 +382,12 @@ pub enum SendingTraceProcessorError { ConcentratorCommandError(SendError), } +impl Display for SendingTraceProcessorError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + impl From> for SendingTraceProcessorError { fn from(err: SendError) -> Self { SendingTraceProcessorError::ConcentratorCommandError(err) @@ -424,7 +431,7 @@ impl SendingTraceProcessor { mut traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> Result<(), SendingTraceProcessorError> { + ) -> Result>, SendingTraceProcessorError> { traces = if let Some(appsec) = &self.appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { @@ -464,10 +471,10 @@ impl SendingTraceProcessor { if traces.is_empty() { debug!("TRACE_PROCESSOR | no traces left to be sent, skipping..."); - return Ok(()); + return Ok(vec![]); } - let payload = self + let (payload, processed_traces) = self .processor .process_traces( config.clone(), @@ -483,9 +490,9 @@ impl SendingTraceProcessor { // This needs to be after send_processed_traces() because send_processed_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if config.compute_trace_stats { - self.stats_sender.send(&traces)?; + self.stats_sender.send(&processed_traces)?; } - Ok(()) + Ok(processed_traces) } } From f3f163b07914b338eee4c08dcdb8ad78313cb4e3 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 13:48:29 -0400 Subject: [PATCH 03/11] Fix some errors --- bottlecap/src/appsec/processor/context.rs | 9 ++------- bottlecap/src/lifecycle/invocation/processor.rs | 9 ++------- bottlecap/src/traces/trace_agent.rs | 11 ++++------- bottlecap/src/traces/trace_processor.rs | 4 ++-- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/bottlecap/src/appsec/processor/context.rs b/bottlecap/src/appsec/processor/context.rs index 7462f3240..4442a0336 100644 --- a/bottlecap/src/appsec/processor/context.rs +++ b/bottlecap/src/appsec/processor/context.rs @@ -112,7 +112,7 @@ impl Context { } debug!("aap: flushing out trace for request {}", self.rid); - match sender + if let Err(err) = sender .send_processed_traces( args.config, args.tags_provider, @@ -135,12 +135,7 @@ impl Context { ) .await { - Ok(processed_traces) => { - for _trace in processed_traces { - // TODO: send trace stats - } - } - Err(e) => warn!("aap: failed to send trace to aggregator buffer: {e:?}"), + warn!("aap: failed to send trace to aggregator buffer: {err:?}"); } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index e39468e31..91ab6d3eb 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -508,7 +508,7 @@ impl Processor { dropped_p0_spans: 0, }; - match trace_sender + if let Err(err) = trace_sender .send_processed_traces( self.config.clone(), tags_provider.clone(), @@ -519,12 +519,7 @@ impl Processor { ) .await { - Ok(processed_traces) => { - for _trace in processed_traces { - // TODO: send trace stats - } - } - Err(e) => debug!("Failed to send context spans to agent: {e:?}"), + debug!("Failed to send context spans to agent: {err:?}"); } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 49d3e2996..9ec7a1ac1 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -78,7 +78,6 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_sender: Arc, - pub stats_sender: Arc, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -199,16 +198,17 @@ impl TraceAgent { } fn make_router(&self, stats_tx: Sender) -> Router { + let stats_sender = Arc::new(SendingTraceStatsProcessor::new( + self.stats_concentrator.clone(), + )); let trace_state = TraceState { config: Arc::clone(&self.config), trace_sender: Arc::new(SendingTraceProcessor { appsec: self.appsec_processor.clone(), processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), + stats_sender, }), - stats_sender: Arc::new(SendingTraceStatsProcessor::new( - self.stats_concentrator.clone(), - )), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -277,7 +277,6 @@ impl TraceAgent { state.config, request, state.trace_sender, - state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V04, @@ -290,7 +289,6 @@ impl TraceAgent { state.config, request, state.trace_sender, - state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V05, @@ -430,7 +428,6 @@ impl TraceAgent { config: Arc, request: Request, trace_sender: Arc, - stats_sender: Arc, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 459931beb..98fc568cf 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -431,7 +431,7 @@ impl SendingTraceProcessor { mut traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> Result>, SendingTraceProcessorError> { + ) -> Result<(), SendingTraceProcessorError> { traces = if let Some(appsec) = &self.appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { @@ -492,7 +492,7 @@ impl SendingTraceProcessor { if config.compute_trace_stats { self.stats_sender.send(&processed_traces)?; } - Ok(processed_traces) + Ok() } } From 48372e3a5fb487275f18004f9224ea14d176f105 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 14:31:19 -0400 Subject: [PATCH 04/11] Clone processed traces --- bottlecap/src/bin/bottlecap/main.rs | 18 +++++--- bottlecap/src/otlp/agent.rs | 3 +- bottlecap/src/traces/trace_agent.rs | 20 ++++----- bottlecap/src/traces/trace_processor.rs | 42 +++++++++---------- bottlecap/src/traces/trace_stats_processor.rs | 40 ++++++++++-------- 5 files changed, 67 insertions(+), 56 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 53e1e3b8e..6f73dde1d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -57,7 +57,7 @@ use bottlecap::{ proxy_aggregator, proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, - stats_concentrator_service::StatsConcentratorService, + stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_aggregator::{self, SendDataBuilderInfo}, @@ -434,6 +434,7 @@ async fn extension_loop_active( stats_flusher, proxy_flusher, trace_agent_shutdown_token, + stats_concentrator, ) = start_trace_agent( config, &api_key_factory, @@ -508,7 +509,7 @@ async fn extension_loop_active( tokio::select! { biased; Some(event) = event_bus.rx.recv() => { - if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await { + if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await { if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record { break 'flush_end; } @@ -631,7 +632,7 @@ async fn extension_loop_active( break 'next_invocation; } Some(event) = event_bus.rx.recv() => { - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; } _ = race_flush_interval.tick() => { if flush_control.flush_strategy == FlushStrategy::Default { @@ -672,7 +673,7 @@ async fn extension_loop_active( debug!("Received tombstone event, proceeding with shutdown"); break 'shutdown; } - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; } // Add timeout to prevent hanging indefinitely () = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => { @@ -753,6 +754,7 @@ async fn handle_event_bus_event( tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, + stats_concentrator: StatsConcentratorHandle, ) -> Option { match event { Event::OutOfMemory(event_timestamp) => { @@ -804,7 +806,9 @@ async fn handle_event_bus_event( appsec: appsec_processor.clone(), processor: trace_processor.clone(), trace_tx: trace_agent_channel.clone(), - stats_sender: Arc::new(SendingTraceStatsProcessor::new(stats_agent_channel.clone())), + stats_sender: Arc::new(SendingTraceStatsProcessor::new( + stats_concentrator.clone(), + )), }), event.time.timestamp(), ) @@ -989,6 +993,7 @@ fn start_trace_agent( Arc, Arc, tokio_util::sync::CancellationToken, + StatsConcentratorHandle, ) { // Stats let (stats_concentrator_service, stats_concentrator_handle) = @@ -1044,7 +1049,7 @@ fn start_trace_agent( invocation_processor, appsec_processor, Arc::clone(tags_provider), - stats_concentrator_handle, + stats_concentrator_handle.clone(), ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); @@ -1063,6 +1068,7 @@ fn start_trace_agent( stats_flusher, proxy_flusher, shutdown_token, + stats_concentrator_handle, ) } diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 5b7a5ce07..075be8072 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -171,8 +171,7 @@ impl Agent { traces, body_size, None, - ) - .await; + ); match trace_tx.send(send_data_builder).await { Ok(()) => { diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 9ec7a1ac1..32c45f2e9 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -519,17 +519,17 @@ impl TraceAgent { .await; } } - + if let Err(err) = trace_sender - .send_processed_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ) - .await + .send_processed_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ) + .await { return error_response( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 98fc568cf..fbc25f372 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -24,13 +24,13 @@ use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection}; use ddcommon::Endpoint; use regex::Regex; +use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; -use std::fmt::Display; use super::stats_concentrator_service::ConcentratorCommand; use super::trace_aggregator::SendDataBuilderInfo; @@ -310,7 +310,7 @@ fn filter_span_from_lambda_library_or_runtime(span: &Span) -> bool { #[allow(clippy::too_many_arguments)] #[async_trait] pub trait TraceProcessor { - async fn process_traces( + fn process_traces( &self, config: Arc, tags_provider: Arc, @@ -318,12 +318,12 @@ pub trait TraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> (SendDataBuilderInfo, Vec>); + ) -> (SendDataBuilderInfo, TracerPayloadCollection); } #[async_trait] impl TraceProcessor for ServerlessTraceProcessor { - async fn process_traces( + fn process_traces( &self, config: Arc, tags_provider: Arc, @@ -331,7 +331,7 @@ impl TraceProcessor for ServerlessTraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> (SendDataBuilderInfo, Vec>) { + ) -> (SendDataBuilderInfo, TracerPayloadCollection) { let mut payload = trace_utils::collect_pb_trace_chunks( traces, &header_tags, @@ -363,7 +363,7 @@ impl TraceProcessor for ServerlessTraceProcessor { test_token: None, }; - let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) + let builder = SendDataBuilder::new(body_size, payload.clone(), header_tags, &endpoint) .with_compression(Compression::Zstd(config.apm_config_compression_level)) .with_retry_strategy(RetryStrategy::new( 1, @@ -372,7 +372,7 @@ impl TraceProcessor for ServerlessTraceProcessor { None, )); - (SendDataBuilderInfo::new(builder, body_size), traces) + (SendDataBuilderInfo::new(builder, body_size), payload) } } @@ -471,7 +471,7 @@ impl SendingTraceProcessor { if traces.is_empty() { debug!("TRACE_PROCESSOR | no traces left to be sent, skipping..."); - return Ok(vec![]); + return Ok(()); } let (payload, processed_traces) = self @@ -483,16 +483,15 @@ impl SendingTraceProcessor { traces, body_size, span_pointers, - ) - .await; + ); self.trace_tx.send(payload).await?; // This needs to be after send_processed_traces() because send_processed_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if config.compute_trace_stats { - self.stats_sender.send(&processed_traces)?; + self.stats_sender.send(&processed_traces); } - Ok() + Ok(()) } } @@ -614,14 +613,15 @@ mod tests { }; let config = create_test_config(); let tags_provider = create_tags_provider(config.clone()); - let tracer_payload = trace_processor.process_traces( - config, - tags_provider.clone(), - header_tags, - traces, - 100, - None, - ); + let (tracer_payload, _processed_traces) = trace_processor + .process_traces( + config, + tags_provider.clone(), + header_tags, + traces, + 100, + None, + ); let expected_tracer_payload = pb::TracerPayload { container_id: "33".to_string(), @@ -643,7 +643,7 @@ mod tests { }; let received_payload = if let TracerPayloadCollection::V07(payload) = - tracer_payload.await.builder.build().get_payloads() + tracer_payload.builder.build().get_payloads() { Some(payload[0].clone()) } else { diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index 9d637253d..af99c29cd 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -1,10 +1,7 @@ -use tokio::sync::mpsc::error::SendError; - -use tracing::debug; - use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; -use crate::traces::stats_concentrator_service::{ConcentratorCommand, StatsConcentratorHandle}; -use datadog_trace_protobuf::pb; +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; +use datadog_trace_utils::tracer_payload::TracerPayloadCollection; +use tracing::error; pub struct SendingTraceStatsProcessor { stats_concentrator: StatsConcentratorHandle, @@ -17,18 +14,27 @@ impl SendingTraceStatsProcessor { Self { stats_concentrator } } - pub fn send(&self, traces: &[Vec]) -> Result<(), SendError> { - debug!("Sending trace stats to the concentrator"); - for trace in traces { - for span in trace { - let stats = StatsEvent { - time: span.start.try_into().unwrap_or_default(), - aggregation_key: AggregationKey {}, - stats: Stats {}, - }; - self.stats_concentrator.add(stats)?; + pub fn send(&self, traces: &TracerPayloadCollection) { + match traces { + TracerPayloadCollection::V07(traces) => { + for trace in traces { + for chunk in &trace.chunks { + for span in &chunk.spans { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey {}, + stats: Stats {}, + }; + if let Err(err) = self.stats_concentrator.add(stats) { + error!("Failed to send trace stats: {err}"); + } + } + } + } + } + _ => { + error!("Unsupported trace payload version. Failed to send trace stats."); } } - Ok(()) } } From d5d38ca3e2423fce9a60bc14cf3ceccb046b27d3 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 14:53:54 -0400 Subject: [PATCH 05/11] Send traces for otlp agent --- bottlecap/src/appsec/processor/context.rs | 5 +-- bottlecap/src/bin/bottlecap/main.rs | 6 ++-- .../src/lifecycle/invocation/processor.rs | 4 +-- bottlecap/src/otlp/agent.rs | 33 ++++++++++++------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/bottlecap/src/appsec/processor/context.rs b/bottlecap/src/appsec/processor/context.rs index 4442a0336..cc0fff886 100644 --- a/bottlecap/src/appsec/processor/context.rs +++ b/bottlecap/src/appsec/processor/context.rs @@ -112,7 +112,7 @@ impl Context { } debug!("aap: flushing out trace for request {}", self.rid); - if let Err(err) = sender + match sender .send_processed_traces( args.config, args.tags_provider, @@ -135,7 +135,8 @@ impl Context { ) .await { - warn!("aap: failed to send trace to aggregator buffer: {err:?}"); + Ok(()) => debug!("aap: successfully sent trace to aggregator buffer"), + Err(e) => warn!("aap: failed to send trace to aggregator buffer: {e}"), } } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 6f73dde1d..77de6dc91 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -479,6 +479,7 @@ async fn extension_loop_active( tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), + stats_concentrator.clone(), ); let mut flush_control = @@ -1153,12 +1154,13 @@ fn start_otlp_agent( tags_provider: Arc, trace_processor: Arc, trace_tx: Sender, + stats_concentrator: StatsConcentratorHandle, ) -> Option { if !should_enable_otlp_agent(config) { return None; } - - let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx); + let stats_sender = Arc::new(SendingTraceStatsProcessor::new(stats_concentrator)); + let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx, stats_sender); let cancel_token = agent.cancel_token(); if let Err(e) = agent.start() { error!("Error starting OTLP agent: {e:?}"); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 91ab6d3eb..28d9fd37f 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -508,7 +508,7 @@ impl Processor { dropped_p0_spans: 0, }; - if let Err(err) = trace_sender + if let Err(e) = trace_sender .send_processed_traces( self.config.clone(), tags_provider.clone(), @@ -519,7 +519,7 @@ impl Processor { ) .await { - debug!("Failed to send context spans to agent: {err:?}"); + debug!("Failed to send context spans to agent: {e}"); } } diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 075be8072..c3a8f1a96 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -18,7 +18,7 @@ use crate::{ http::{extract_request_body, handler_not_found}, otlp::processor::Processor as OtlpProcessor, tags::provider, - traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor}, + traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor, trace_stats_processor::SendingTraceStatsProcessor}, }; const OTLP_AGENT_HTTP_PORT: u16 = 4318; @@ -29,6 +29,7 @@ type AgentState = ( OtlpProcessor, Arc, Sender, + Arc, ); pub struct Agent { @@ -37,6 +38,7 @@ pub struct Agent { processor: OtlpProcessor, trace_processor: Arc, trace_tx: Sender, + stats_sender: Arc, port: u16, cancel_token: CancellationToken, } @@ -47,6 +49,7 @@ impl Agent { tags_provider: Arc, trace_processor: Arc, trace_tx: Sender, + stats_sender: Arc, ) -> Self { let port = Self::parse_port( config.otlp_config_receiver_protocols_http_endpoint.as_ref(), @@ -60,6 +63,7 @@ impl Agent { processor: OtlpProcessor::new(Arc::clone(&config)), trace_processor, trace_tx, + stats_sender, port, cancel_token, } @@ -112,6 +116,7 @@ impl Agent { self.processor.clone(), Arc::clone(&self.trace_processor), self.trace_tx.clone(), + Arc::clone(&self.stats_sender), ); Router::new() @@ -126,7 +131,7 @@ impl Agent { } async fn v1_traces( - State((config, tags_provider, processor, trace_processor, trace_tx)): State, + State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State, request: Request, ) -> Response { let (parts, body) = match extract_request_body(request).await { @@ -163,7 +168,8 @@ impl Agent { .into_response(); } - let (send_data_builder, _traces) = trace_processor + let compute_trace_stats = config.compute_trace_stats; + let (send_data_builder, processed_traces) = trace_processor .process_traces( config, tags_provider, @@ -176,22 +182,27 @@ impl Agent { match trace_tx.send(send_data_builder).await { Ok(()) => { debug!("OTLP | Successfully buffered traces to be aggregated."); - ( - StatusCode::OK, - json!({"rate_by_service":{"service:,env:":1}}).to_string(), - ) - .into_response() } Err(err) => { error!("OTLP | Error sending traces to the trace aggregator: {err}"); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, json!({ "message": format!("Error sending traces to the trace aggregator: {err}") }).to_string() - ).into_response() + ).into_response(); } + }; + + // This needs to be after send_processed_traces() because send_processed_traces() + // performs obfuscation, and we need to compute stats on the obfuscated traces. + if compute_trace_stats { + stats_sender.send(&processed_traces); } - // TODO: send trace stats + ( + StatusCode::OK, + json!({"rate_by_service":{"service:,env:":1}}).to_string(), + ) + .into_response() } } From 92fb22534a4835ba25601e632ec5922685dd483d Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 14:54:02 -0400 Subject: [PATCH 06/11] fmt --- bottlecap/src/bin/bottlecap/main.rs | 8 +++++- bottlecap/src/otlp/agent.rs | 26 ++++++++++-------- bottlecap/src/traces/trace_processor.rs | 35 +++++++++++-------------- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 77de6dc91..a425236f9 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1160,7 +1160,13 @@ fn start_otlp_agent( return None; } let stats_sender = Arc::new(SendingTraceStatsProcessor::new(stats_concentrator)); - let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx, stats_sender); + let agent = OtlpAgent::new( + config.clone(), + tags_provider, + trace_processor, + trace_tx, + stats_sender, + ); let cancel_token = agent.cancel_token(); if let Err(e) = agent.start() { error!("Error starting OTLP agent: {e:?}"); diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index c3a8f1a96..6d830d478 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -18,7 +18,10 @@ use crate::{ http::{extract_request_body, handler_not_found}, otlp::processor::Processor as OtlpProcessor, tags::provider, - traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor, trace_stats_processor::SendingTraceStatsProcessor}, + traces::{ + trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor, + trace_stats_processor::SendingTraceStatsProcessor, + }, }; const OTLP_AGENT_HTTP_PORT: u16 = 4318; @@ -131,7 +134,9 @@ impl Agent { } async fn v1_traces( - State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State, + State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State< + AgentState, + >, request: Request, ) -> Response { let (parts, body) = match extract_request_body(request).await { @@ -169,15 +174,14 @@ impl Agent { } let compute_trace_stats = config.compute_trace_stats; - let (send_data_builder, processed_traces) = trace_processor - .process_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ); + let (send_data_builder, processed_traces) = trace_processor.process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ); match trace_tx.send(send_data_builder).await { Ok(()) => { diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index fbc25f372..41f5347fb 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -474,16 +474,14 @@ impl SendingTraceProcessor { return Ok(()); } - let (payload, processed_traces) = self - .processor - .process_traces( - config.clone(), - tags_provider, - header_tags, - traces, - body_size, - span_pointers, - ); + let (payload, processed_traces) = self.processor.process_traces( + config.clone(), + tags_provider, + header_tags, + traces, + body_size, + span_pointers, + ); self.trace_tx.send(payload).await?; // This needs to be after send_processed_traces() because send_processed_traces() @@ -613,15 +611,14 @@ mod tests { }; let config = create_test_config(); let tags_provider = create_tags_provider(config.clone()); - let (tracer_payload, _processed_traces) = trace_processor - .process_traces( - config, - tags_provider.clone(), - header_tags, - traces, - 100, - None, - ); + let (tracer_payload, _processed_traces) = trace_processor.process_traces( + config, + tags_provider.clone(), + header_tags, + traces, + 100, + None, + ); let expected_tracer_payload = pb::TracerPayload { container_id: "33".to_string(), From b327cf399a50c67ef263077a618f95400d0b4fb2 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 15:00:39 -0400 Subject: [PATCH 07/11] Use thiserror --- bottlecap/src/traces/trace_processor.rs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 41f5347fb..7c13c5385 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -24,7 +24,6 @@ use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection}; use ddcommon::Endpoint; use regex::Regex; -use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; @@ -376,30 +375,14 @@ impl TraceProcessor for ServerlessTraceProcessor { } } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum SendingTraceProcessorError { + #[error("Error sending traces to the trace aggregator: {0}")] SendDataBuilderInfoError(SendError), + #[error("Error sending traces to the stats concentrator: {0}")] ConcentratorCommandError(SendError), } -impl Display for SendingTraceProcessorError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -impl From> for SendingTraceProcessorError { - fn from(err: SendError) -> Self { - SendingTraceProcessorError::ConcentratorCommandError(err) - } -} - -impl From> for SendingTraceProcessorError { - fn from(err: SendError) -> Self { - SendingTraceProcessorError::SendDataBuilderInfoError(err) - } -} - /// A utility that is used to process, then send traces to the trace aggregator. /// /// This applies [`AppSecProcessor::process_span`] on the `aws.lambda` span @@ -482,7 +465,7 @@ impl SendingTraceProcessor { body_size, span_pointers, ); - self.trace_tx.send(payload).await?; + self.trace_tx.send(payload).await.map_err(SendingTraceProcessorError::SendDataBuilderInfoError)?; // This needs to be after send_processed_traces() because send_processed_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. From 32ba5246776d931e7a8c18874dc47618f9e38a2b Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 15:10:25 -0400 Subject: [PATCH 08/11] Add error handling --- bottlecap/src/otlp/agent.rs | 8 ++- bottlecap/src/traces/trace_processor.rs | 17 ++++--- bottlecap/src/traces/trace_stats_processor.rs | 51 ++++++++++++------- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 6d830d478..b16649471 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -199,7 +199,13 @@ impl Agent { // This needs to be after send_processed_traces() because send_processed_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if compute_trace_stats { - stats_sender.send(&processed_traces); + if let Err(err) = stats_sender.send(&processed_traces) { + error!("OTLP | Error sending traces to the stats concentrator: {err}"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "message": format!("Error sending traces to the stats concentrator: {err}") }).to_string() + ).into_response(); + } } ( diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 7c13c5385..292e3a8e6 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -31,9 +31,8 @@ use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; -use super::stats_concentrator_service::ConcentratorCommand; use super::trace_aggregator::SendDataBuilderInfo; -use super::trace_stats_processor::SendingTraceStatsProcessor; +use super::trace_stats_processor::{SendingTraceStatsProcessor, SendingTraceStatsProcessorError}; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] @@ -379,8 +378,8 @@ impl TraceProcessor for ServerlessTraceProcessor { pub enum SendingTraceProcessorError { #[error("Error sending traces to the trace aggregator: {0}")] SendDataBuilderInfoError(SendError), - #[error("Error sending traces to the stats concentrator: {0}")] - ConcentratorCommandError(SendError), + #[error("Error sending traces to the stats concentrator")] + SendStatsError(SendingTraceStatsProcessorError), } /// A utility that is used to process, then send traces to the trace aggregator. @@ -465,12 +464,18 @@ impl SendingTraceProcessor { body_size, span_pointers, ); - self.trace_tx.send(payload).await.map_err(SendingTraceProcessorError::SendDataBuilderInfoError)?; + self.trace_tx + .send(payload) + .await + .map_err(SendingTraceProcessorError::SendDataBuilderInfoError)?; // This needs to be after send_processed_traces() because send_processed_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if config.compute_trace_stats { - self.stats_sender.send(&processed_traces); + if let Err(err) = self.stats_sender.send(&processed_traces) { + error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}"); + return Err(SendingTraceProcessorError::SendStatsError(err)); + } } Ok(()) } diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index af99c29cd..47fcd6463 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -3,10 +3,22 @@ use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_utils::tracer_payload::TracerPayloadCollection; use tracing::error; +use tokio::sync::mpsc::error::SendError; + +use crate::traces::stats_concentrator_service::ConcentratorCommand; + pub struct SendingTraceStatsProcessor { stats_concentrator: StatsConcentratorHandle, } +#[derive(Debug, thiserror::Error)] +pub enum SendingTraceStatsProcessorError { + #[error("Error sending trace stats to the stats concentrator: {0}")] + ConcentratorCommandError(SendError), + #[error("Unsupported trace payload version. Failed to send trace stats.")] + TracePayloadVersionError, +} + // Extracts information from traces related to stats and sends it to the stats concentrator impl SendingTraceStatsProcessor { #[must_use] @@ -14,27 +26,32 @@ impl SendingTraceStatsProcessor { Self { stats_concentrator } } - pub fn send(&self, traces: &TracerPayloadCollection) { - match traces { - TracerPayloadCollection::V07(traces) => { - for trace in traces { - for chunk in &trace.chunks { - for span in &chunk.spans { - let stats = StatsEvent { - time: span.start.try_into().unwrap_or_default(), - aggregation_key: AggregationKey {}, - stats: Stats {}, - }; - if let Err(err) = self.stats_concentrator.add(stats) { - error!("Failed to send trace stats: {err}"); - } + pub fn send( + &self, + traces: &TracerPayloadCollection, + ) -> Result<(), SendingTraceStatsProcessorError> { + if let TracerPayloadCollection::V07(traces) = traces { + for trace in traces { + for chunk in &trace.chunks { + for span in &chunk.spans { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey {}, + stats: Stats {}, + }; + if let Err(err) = self.stats_concentrator.add(stats) { + error!("Failed to send trace stats: {err}"); + return Err(SendingTraceStatsProcessorError::ConcentratorCommandError( + err, + )); } } } } - _ => { - error!("Unsupported trace payload version. Failed to send trace stats."); - } + Ok(()) + } else { + error!("Unsupported trace payload version. Failed to send trace stats."); + Err(SendingTraceStatsProcessorError::TracePayloadVersionError) } } } From 1de93b1e8c8b142d38e929865f250075a5f07c43 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 15:56:38 -0400 Subject: [PATCH 09/11] Comment --- bottlecap/src/otlp/agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index b16649471..e02beeedd 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -196,7 +196,7 @@ impl Agent { } }; - // This needs to be after send_processed_traces() because send_processed_traces() + // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if compute_trace_stats { if let Err(err) = stats_sender.send(&processed_traces) { From d6d0adaf667c7ca5f2871e7564dbfa9c226baef3 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 19 Sep 2025 16:18:48 -0400 Subject: [PATCH 10/11] Comment --- bottlecap/src/traces/trace_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 292e3a8e6..3cca8141d 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -469,7 +469,7 @@ impl SendingTraceProcessor { .await .map_err(SendingTraceProcessorError::SendDataBuilderInfoError)?; - // This needs to be after send_processed_traces() because send_processed_traces() + // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if config.compute_trace_stats { if let Err(err) = self.stats_sender.send(&processed_traces) { From 71c1d7979c83a7494e88b2cd2d979e997e436631 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 22 Sep 2025 13:33:38 -0400 Subject: [PATCH 11/11] Do not return error when stats fail to send --- bottlecap/src/otlp/agent.rs | 6 ++---- bottlecap/src/traces/trace_processor.rs | 20 +++++--------------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index e02beeedd..17a0207c2 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -200,11 +200,9 @@ impl Agent { // performs obfuscation, and we need to compute stats on the obfuscated traces. if compute_trace_stats { if let Err(err) = stats_sender.send(&processed_traces) { + // Just log the error. We don't think trace stats are critical, so we don't want to + // return an error if only stats fail to send. error!("OTLP | Error sending traces to the stats concentrator: {err}"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "message": format!("Error sending traces to the stats concentrator: {err}") }).to_string() - ).into_response(); } } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 3cca8141d..a174e42d4 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -32,7 +32,7 @@ use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; use super::trace_aggregator::SendDataBuilderInfo; -use super::trace_stats_processor::{SendingTraceStatsProcessor, SendingTraceStatsProcessorError}; +use super::trace_stats_processor::SendingTraceStatsProcessor; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] @@ -374,14 +374,6 @@ impl TraceProcessor for ServerlessTraceProcessor { } } -#[derive(Debug, thiserror::Error)] -pub enum SendingTraceProcessorError { - #[error("Error sending traces to the trace aggregator: {0}")] - SendDataBuilderInfoError(SendError), - #[error("Error sending traces to the stats concentrator")] - SendStatsError(SendingTraceStatsProcessorError), -} - /// A utility that is used to process, then send traces to the trace aggregator. /// /// This applies [`AppSecProcessor::process_span`] on the `aws.lambda` span @@ -413,7 +405,7 @@ impl SendingTraceProcessor { mut traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> Result<(), SendingTraceProcessorError> { + ) -> Result<(), SendError> { traces = if let Some(appsec) = &self.appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { @@ -464,17 +456,15 @@ impl SendingTraceProcessor { body_size, span_pointers, ); - self.trace_tx - .send(payload) - .await - .map_err(SendingTraceProcessorError::SendDataBuilderInfoError)?; + self.trace_tx.send(payload).await?; // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. if config.compute_trace_stats { if let Err(err) = self.stats_sender.send(&processed_traces) { + // Just log the error. We don't think trace stats are critical, so we don't want to + // return an error if only stats fail to send. error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}"); - return Err(SendingTraceProcessorError::SendStatsError(err)); } } Ok(())