diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index e89839555..7209783fe 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -57,12 +57,13 @@ use bottlecap::{ proxy_aggregator, proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, - stats_concentrator_service::StatsConcentratorService, + stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, stats_flusher::{self, StatsFlusher}, stats_processor, trace_agent, trace_aggregator::{self, SendDataBuilderInfo}, trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, trace_processor::{self, SendingTraceProcessor}, + trace_stats_processor::SendingTraceStatsProcessor, }, }; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; @@ -433,6 +434,7 @@ async fn extension_loop_active( stats_flusher, proxy_flusher, trace_agent_shutdown_token, + stats_concentrator, ) = start_trace_agent( config, &api_key_factory, @@ -477,6 +479,7 @@ async fn extension_loop_active( tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), + stats_concentrator.clone(), ); let mut flush_control = @@ -508,7 +511,7 @@ async fn extension_loop_active( tokio::select! { biased; Some(event) = event_bus.rx.recv() => { - if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await { + if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await { if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record { break 'flush_end; } @@ -635,7 +638,7 @@ async fn extension_loop_active( break 'next_invocation; } Some(event) = event_bus.rx.recv() => { - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; } _ = race_flush_interval.tick() => { if flush_control.flush_strategy == FlushStrategy::Default { @@ -677,7 +680,7 @@ async fn extension_loop_active( debug!("Received tombstone event, proceeding with shutdown"); break 'shutdown; } - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await; } // Add timeout to prevent hanging indefinitely () = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => { @@ -758,6 +761,7 @@ async fn handle_event_bus_event( tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, + stats_concentrator: StatsConcentratorHandle, ) -> Option { match event { Event::OutOfMemory(event_timestamp) => { @@ -809,6 +813,9 @@ async fn handle_event_bus_event( appsec: appsec_processor.clone(), processor: trace_processor.clone(), trace_tx: trace_agent_channel.clone(), + stats_sender: Arc::new(SendingTraceStatsProcessor::new( + stats_concentrator.clone(), + )), }), event.time.timestamp(), ) @@ -993,6 +1000,7 @@ fn start_trace_agent( Arc, Arc, tokio_util::sync::CancellationToken, + StatsConcentratorHandle, ) { // Stats let (stats_concentrator_service, stats_concentrator_handle) = @@ -1048,7 +1056,7 @@ fn start_trace_agent( invocation_processor, appsec_processor, Arc::clone(tags_provider), - stats_concentrator_handle, + stats_concentrator_handle.clone(), ); let trace_agent_channel = trace_agent.get_sender_copy(); let shutdown_token = trace_agent.shutdown_token(); @@ -1067,6 +1075,7 @@ fn start_trace_agent( stats_flusher, proxy_flusher, shutdown_token, + stats_concentrator_handle, ) } @@ -1151,12 +1160,19 @@ fn start_otlp_agent( tags_provider: Arc, trace_processor: Arc, trace_tx: Sender, + stats_concentrator: StatsConcentratorHandle, ) -> Option { if !should_enable_otlp_agent(config) { return None; } - - let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx); + let stats_sender = Arc::new(SendingTraceStatsProcessor::new(stats_concentrator)); + let agent = OtlpAgent::new( + config.clone(), + tags_provider, + trace_processor, + trace_tx, + stats_sender, + ); let cancel_token = agent.cancel_token(); if let Err(e) = agent.start() { error!("Error starting OTLP agent: {e:?}"); diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index aafa2fec6..17a0207c2 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -18,7 +18,10 @@ use crate::{ http::{extract_request_body, handler_not_found}, otlp::processor::Processor as OtlpProcessor, tags::provider, - traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor}, + traces::{ + trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor, + trace_stats_processor::SendingTraceStatsProcessor, + }, }; const OTLP_AGENT_HTTP_PORT: u16 = 4318; @@ -29,6 +32,7 @@ type AgentState = ( OtlpProcessor, Arc, Sender, + Arc, ); pub struct Agent { @@ -37,6 +41,7 @@ pub struct Agent { processor: OtlpProcessor, trace_processor: Arc, trace_tx: Sender, + stats_sender: Arc, port: u16, cancel_token: CancellationToken, } @@ -47,6 +52,7 @@ impl Agent { tags_provider: Arc, trace_processor: Arc, trace_tx: Sender, + stats_sender: Arc, ) -> Self { let port = Self::parse_port( config.otlp_config_receiver_protocols_http_endpoint.as_ref(), @@ -60,6 +66,7 @@ impl Agent { processor: OtlpProcessor::new(Arc::clone(&config)), trace_processor, trace_tx, + stats_sender, port, cancel_token, } @@ -112,6 +119,7 @@ impl Agent { self.processor.clone(), Arc::clone(&self.trace_processor), self.trace_tx.clone(), + Arc::clone(&self.stats_sender), ); Router::new() @@ -126,7 +134,9 @@ impl Agent { } async fn v1_traces( - State((config, tags_provider, processor, trace_processor, trace_tx)): State, + State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State< + AgentState, + >, request: Request, ) -> Response { let (parts, body) = match extract_request_body(request).await { @@ -163,34 +173,44 @@ impl Agent { .into_response(); } - let send_data_builder = trace_processor - .process_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ) - .await; + let compute_trace_stats = config.compute_trace_stats; + let (send_data_builder, processed_traces) = trace_processor.process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ); match trace_tx.send(send_data_builder).await { Ok(()) => { debug!("OTLP | Successfully buffered traces to be aggregated."); - ( - StatusCode::OK, - json!({"rate_by_service":{"service:,env:":1}}).to_string(), - ) - .into_response() } Err(err) => { error!("OTLP | Error sending traces to the trace aggregator: {err}"); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, json!({ "message": format!("Error sending traces to the trace aggregator: {err}") }).to_string() - ).into_response() + ).into_response(); + } + }; + + // This needs to be after process_traces() because process_traces() + // performs obfuscation, and we need to compute stats on the obfuscated traces. + if compute_trace_stats { + if let Err(err) = stats_sender.send(&processed_traces) { + // Just log the error. We don't think trace stats are critical, so we don't want to + // return an error if only stats fail to send. + error!("OTLP | Error sending traces to the stats concentrator: {err}"); } } + + ( + StatusCode::OK, + json!({"rate_by_service":{"service:,env:":1}}).to_string(), + ) + .into_response() } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 9145a2a2f..32c45f2e9 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -78,7 +78,6 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_sender: Arc, - pub stats_sender: Arc, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -199,16 +198,17 @@ impl TraceAgent { } fn make_router(&self, stats_tx: Sender) -> Router { + let stats_sender = Arc::new(SendingTraceStatsProcessor::new( + self.stats_concentrator.clone(), + )); let trace_state = TraceState { config: Arc::clone(&self.config), trace_sender: Arc::new(SendingTraceProcessor { appsec: self.appsec_processor.clone(), processor: Arc::clone(&self.trace_processor), trace_tx: self.tx.clone(), + stats_sender, }), - stats_sender: Arc::new(SendingTraceStatsProcessor::new( - self.stats_concentrator.clone(), - )), invocation_processor: Arc::clone(&self.invocation_processor), tags_provider: Arc::clone(&self.tags_provider), }; @@ -277,7 +277,6 @@ impl TraceAgent { state.config, request, state.trace_sender, - state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V04, @@ -290,7 +289,6 @@ impl TraceAgent { state.config, request, state.trace_sender, - state.stats_sender, state.invocation_processor, state.tags_provider, ApiVersion::V05, @@ -430,7 +428,6 @@ impl TraceAgent { config: Arc, request: Request, trace_sender: Arc, - stats_sender: Arc, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, @@ -523,15 +520,6 @@ impl TraceAgent { } } - if config.compute_trace_stats { - if let Err(err) = stats_sender.send(&traces) { - return error_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Error sending stats to the stats aggregator: {err}"), - ); - } - } - if let Err(err) = trace_sender .send_processed_traces( config, @@ -545,7 +533,7 @@ impl TraceAgent { { return error_response( StatusCode::INTERNAL_SERVER_ERROR, - format!("Error sending traces to the trace aggregator: {err}"), + format!("Error sending traces to the trace aggregator: {err:?}"), ); } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 89e561dd1..a174e42d4 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -32,6 +32,7 @@ use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; use super::trace_aggregator::SendDataBuilderInfo; +use super::trace_stats_processor::SendingTraceStatsProcessor; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] @@ -307,7 +308,7 @@ fn filter_span_from_lambda_library_or_runtime(span: &Span) -> bool { #[allow(clippy::too_many_arguments)] #[async_trait] pub trait TraceProcessor { - async fn process_traces( + fn process_traces( &self, config: Arc, tags_provider: Arc, @@ -315,12 +316,12 @@ pub trait TraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendDataBuilderInfo; + ) -> (SendDataBuilderInfo, TracerPayloadCollection); } #[async_trait] impl TraceProcessor for ServerlessTraceProcessor { - async fn process_traces( + fn process_traces( &self, config: Arc, tags_provider: Arc, @@ -328,7 +329,7 @@ impl TraceProcessor for ServerlessTraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendDataBuilderInfo { + ) -> (SendDataBuilderInfo, TracerPayloadCollection) { let mut payload = trace_utils::collect_pb_trace_chunks( traces, &header_tags, @@ -360,7 +361,7 @@ impl TraceProcessor for ServerlessTraceProcessor { test_token: None, }; - let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) + let builder = SendDataBuilder::new(body_size, payload.clone(), header_tags, &endpoint) .with_compression(Compression::Zstd(config.apm_config_compression_level)) .with_retry_strategy(RetryStrategy::new( 1, @@ -369,7 +370,7 @@ impl TraceProcessor for ServerlessTraceProcessor { None, )); - SendDataBuilderInfo::new(builder, body_size) + (SendDataBuilderInfo::new(builder, body_size), payload) } } @@ -390,6 +391,8 @@ pub struct SendingTraceProcessor { pub processor: Arc, /// The [`Sender`] to use for flushing the traces to the trace aggregator. pub trace_tx: Sender, + /// The [`SendingTraceStatsProcessor`] to use for sending stats to the stats concentrator. + pub stats_sender: Arc, } impl SendingTraceProcessor { /// Processes the provided traces, then flushes them to the trace aggregator @@ -415,7 +418,7 @@ impl SendingTraceProcessor { Some(trace) } else if let Some(ctx) = ctx{ debug!("TRACE_PROCESSOR | holding trace for App & API Protection additional data"); - ctx.hold_trace(trace, SendingTraceProcessor{ appsec: None, processor: self.processor.clone(), trace_tx: self.trace_tx.clone() }, HoldArguments{ + ctx.hold_trace(trace, SendingTraceProcessor{ appsec: None, processor: self.processor.clone(), trace_tx: self.trace_tx.clone(), stats_sender: self.stats_sender.clone() }, HoldArguments{ config:Arc::clone(&config), tags_provider:Arc::clone(&tags_provider), body_size, @@ -445,18 +448,26 @@ impl SendingTraceProcessor { return Ok(()); } - let payload = self - .processor - .process_traces( - config, - tags_provider, - header_tags, - traces, - body_size, - span_pointers, - ) - .await; - self.trace_tx.send(payload).await + let (payload, processed_traces) = self.processor.process_traces( + config.clone(), + tags_provider, + header_tags, + traces, + body_size, + span_pointers, + ); + self.trace_tx.send(payload).await?; + + // This needs to be after process_traces() because process_traces() + // performs obfuscation, and we need to compute stats on the obfuscated traces. + if config.compute_trace_stats { + if let Err(err) = self.stats_sender.send(&processed_traces) { + // Just log the error. We don't think trace stats are critical, so we don't want to + // return an error if only stats fail to send. + error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}"); + } + } + Ok(()) } } @@ -578,7 +589,7 @@ mod tests { }; let config = create_test_config(); let tags_provider = create_tags_provider(config.clone()); - let tracer_payload = trace_processor.process_traces( + let (tracer_payload, _processed_traces) = trace_processor.process_traces( config, tags_provider.clone(), header_tags, @@ -607,7 +618,7 @@ mod tests { }; let received_payload = if let TracerPayloadCollection::V07(payload) = - tracer_payload.await.builder.build().get_payloads() + tracer_payload.builder.build().get_payloads() { Some(payload[0].clone()) } else { diff --git a/bottlecap/src/traces/trace_stats_processor.rs b/bottlecap/src/traces/trace_stats_processor.rs index 9d637253d..47fcd6463 100644 --- a/bottlecap/src/traces/trace_stats_processor.rs +++ b/bottlecap/src/traces/trace_stats_processor.rs @@ -1,15 +1,24 @@ -use tokio::sync::mpsc::error::SendError; +use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; +use crate::traces::stats_concentrator_service::StatsConcentratorHandle; +use datadog_trace_utils::tracer_payload::TracerPayloadCollection; +use tracing::error; -use tracing::debug; +use tokio::sync::mpsc::error::SendError; -use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; -use crate::traces::stats_concentrator_service::{ConcentratorCommand, StatsConcentratorHandle}; -use datadog_trace_protobuf::pb; +use crate::traces::stats_concentrator_service::ConcentratorCommand; pub struct SendingTraceStatsProcessor { stats_concentrator: StatsConcentratorHandle, } +#[derive(Debug, thiserror::Error)] +pub enum SendingTraceStatsProcessorError { + #[error("Error sending trace stats to the stats concentrator: {0}")] + ConcentratorCommandError(SendError), + #[error("Unsupported trace payload version. Failed to send trace stats.")] + TracePayloadVersionError, +} + // Extracts information from traces related to stats and sends it to the stats concentrator impl SendingTraceStatsProcessor { #[must_use] @@ -17,18 +26,32 @@ impl SendingTraceStatsProcessor { Self { stats_concentrator } } - pub fn send(&self, traces: &[Vec]) -> Result<(), SendError> { - debug!("Sending trace stats to the concentrator"); - for trace in traces { - for span in trace { - let stats = StatsEvent { - time: span.start.try_into().unwrap_or_default(), - aggregation_key: AggregationKey {}, - stats: Stats {}, - }; - self.stats_concentrator.add(stats)?; + pub fn send( + &self, + traces: &TracerPayloadCollection, + ) -> Result<(), SendingTraceStatsProcessorError> { + if let TracerPayloadCollection::V07(traces) = traces { + for trace in traces { + for chunk in &trace.chunks { + for span in &chunk.spans { + let stats = StatsEvent { + time: span.start.try_into().unwrap_or_default(), + aggregation_key: AggregationKey {}, + stats: Stats {}, + }; + if let Err(err) = self.stats_concentrator.add(stats) { + error!("Failed to send trace stats: {err}"); + return Err(SendingTraceStatsProcessorError::ConcentratorCommandError( + err, + )); + } + } + } } + Ok(()) + } else { + error!("Unsupported trace payload version. Failed to send trace stats."); + Err(SendingTraceStatsProcessorError::TracePayloadVersionError) } - Ok(()) } }