Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ use bottlecap::{
proxy_aggregator,
proxy_flusher::Flusher as ProxyFlusher,
stats_aggregator::StatsAggregator,
stats_concentrator_service::StatsConcentratorService,
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent,
trace_aggregator::{self, SendDataBuilderInfo},
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
trace_processor::{self, SendingTraceProcessor},
trace_stats_processor::SendingTraceStatsProcessor,
},
};
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
Expand Down Expand Up @@ -433,6 +434,7 @@ async fn extension_loop_active(
stats_flusher,
proxy_flusher,
trace_agent_shutdown_token,
stats_concentrator,
) = start_trace_agent(
config,
&api_key_factory,
Expand Down Expand Up @@ -477,6 +479,7 @@ async fn extension_loop_active(
tags_provider.clone(),
trace_processor.clone(),
trace_agent_channel.clone(),
stats_concentrator.clone(),
);

let mut flush_control =
Expand Down Expand Up @@ -508,7 +511,7 @@ async fn extension_loop_active(
tokio::select! {
biased;
Some(event) = event_bus.rx.recv() => {
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await {
if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
break 'flush_end;
}
Expand Down Expand Up @@ -635,7 +638,7 @@ async fn extension_loop_active(
break 'next_invocation;
}
Some(event) = event_bus.rx.recv() => {
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await;
}
_ = race_flush_interval.tick() => {
if flush_control.flush_strategy == FlushStrategy::Default {
Expand Down Expand Up @@ -677,7 +680,7 @@ async fn extension_loop_active(
debug!("Received tombstone event, proceeding with shutdown");
break 'shutdown;
}
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await;
}
// Add timeout to prevent hanging indefinitely
() = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => {
Expand Down Expand Up @@ -758,6 +761,7 @@ async fn handle_event_bus_event(
tags_provider: Arc<TagProvider>,
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
trace_agent_channel: Sender<SendDataBuilderInfo>,
stats_concentrator: StatsConcentratorHandle,
) -> Option<TelemetryEvent> {
match event {
Event::OutOfMemory(event_timestamp) => {
Expand Down Expand Up @@ -809,6 +813,9 @@ async fn handle_event_bus_event(
appsec: appsec_processor.clone(),
processor: trace_processor.clone(),
trace_tx: trace_agent_channel.clone(),
stats_sender: Arc::new(SendingTraceStatsProcessor::new(
stats_concentrator.clone(),
)),
}),
event.time.timestamp(),
)
Expand Down Expand Up @@ -993,6 +1000,7 @@ fn start_trace_agent(
Arc<stats_flusher::ServerlessStatsFlusher>,
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
StatsConcentratorHandle,
) {
// Stats
let (stats_concentrator_service, stats_concentrator_handle) =
Expand Down Expand Up @@ -1048,7 +1056,7 @@ fn start_trace_agent(
invocation_processor,
appsec_processor,
Arc::clone(tags_provider),
stats_concentrator_handle,
stats_concentrator_handle.clone(),
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand All @@ -1067,6 +1075,7 @@ fn start_trace_agent(
stats_flusher,
proxy_flusher,
shutdown_token,
stats_concentrator_handle,
)
}

Expand Down Expand Up @@ -1151,12 +1160,19 @@ fn start_otlp_agent(
tags_provider: Arc<TagProvider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_tx: Sender<SendDataBuilderInfo>,
stats_concentrator: StatsConcentratorHandle,
) -> Option<CancellationToken> {
if !should_enable_otlp_agent(config) {
return None;
}

let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx);
let stats_sender = Arc::new(SendingTraceStatsProcessor::new(stats_concentrator));
let agent = OtlpAgent::new(
config.clone(),
tags_provider,
trace_processor,
trace_tx,
stats_sender,
);
let cancel_token = agent.cancel_token();
if let Err(e) = agent.start() {
error!("Error starting OTLP agent: {e:?}");
Expand Down
58 changes: 39 additions & 19 deletions bottlecap/src/otlp/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use crate::{
http::{extract_request_body, handler_not_found},
otlp::processor::Processor as OtlpProcessor,
tags::provider,
traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor},
traces::{
trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor,
trace_stats_processor::SendingTraceStatsProcessor,
},
};

const OTLP_AGENT_HTTP_PORT: u16 = 4318;
Expand All @@ -29,6 +32,7 @@ type AgentState = (
OtlpProcessor,
Arc<dyn TraceProcessor + Send + Sync>,
Sender<SendDataBuilderInfo>,
Arc<SendingTraceStatsProcessor>,
);

pub struct Agent {
Expand All @@ -37,6 +41,7 @@ pub struct Agent {
processor: OtlpProcessor,
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
trace_tx: Sender<SendDataBuilderInfo>,
stats_sender: Arc<SendingTraceStatsProcessor>,
port: u16,
cancel_token: CancellationToken,
}
Expand All @@ -47,6 +52,7 @@ impl Agent {
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
trace_tx: Sender<SendDataBuilderInfo>,
stats_sender: Arc<SendingTraceStatsProcessor>,
) -> Self {
let port = Self::parse_port(
config.otlp_config_receiver_protocols_http_endpoint.as_ref(),
Expand All @@ -60,6 +66,7 @@ impl Agent {
processor: OtlpProcessor::new(Arc::clone(&config)),
trace_processor,
trace_tx,
stats_sender,
port,
cancel_token,
}
Expand Down Expand Up @@ -112,6 +119,7 @@ impl Agent {
self.processor.clone(),
Arc::clone(&self.trace_processor),
self.trace_tx.clone(),
Arc::clone(&self.stats_sender),
);

Router::new()
Expand All @@ -126,7 +134,9 @@ impl Agent {
}

async fn v1_traces(
State((config, tags_provider, processor, trace_processor, trace_tx)): State<AgentState>,
State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State<
AgentState,
>,
request: Request,
) -> Response {
let (parts, body) = match extract_request_body(request).await {
Expand Down Expand Up @@ -163,34 +173,44 @@ impl Agent {
.into_response();
}

let send_data_builder = trace_processor
.process_traces(
config,
tags_provider,
tracer_header_tags,
traces,
body_size,
None,
)
.await;
let compute_trace_stats = config.compute_trace_stats;
let (send_data_builder, processed_traces) = trace_processor.process_traces(
config,
tags_provider,
tracer_header_tags,
traces,
body_size,
None,
);

match trace_tx.send(send_data_builder).await {
Ok(()) => {
debug!("OTLP | Successfully buffered traces to be aggregated.");
(
StatusCode::OK,
json!({"rate_by_service":{"service:,env:":1}}).to_string(),
)
.into_response()
}
Err(err) => {
error!("OTLP | Error sending traces to the trace aggregator: {err}");
(
return (
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "message": format!("Error sending traces to the trace aggregator: {err}") }).to_string()
).into_response()
).into_response();
}
};

// This needs to be after process_traces() because process_traces()
// performs obfuscation, and we need to compute stats on the obfuscated traces.
if compute_trace_stats {
if let Err(err) = stats_sender.send(&processed_traces) {
// Just log the error. We don't think trace stats are critical, so we don't want to
// return an error if only stats fail to send.
error!("OTLP | Error sending traces to the stats concentrator: {err}");
}
}

(
StatusCode::OK,
json!({"rate_by_service":{"service:,env:":1}}).to_string(),
)
.into_response()
}
}

Expand Down
22 changes: 5 additions & 17 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load";
pub struct TraceState {
pub config: Arc<config::Config>,
pub trace_sender: Arc<trace_processor::SendingTraceProcessor>,
pub stats_sender: Arc<SendingTraceStatsProcessor>,
pub invocation_processor: Arc<Mutex<InvocationProcessor>>,
pub tags_provider: Arc<provider::Provider>,
}
Expand Down Expand Up @@ -199,16 +198,17 @@ impl TraceAgent {
}

fn make_router(&self, stats_tx: Sender<pb::ClientStatsPayload>) -> Router {
let stats_sender = Arc::new(SendingTraceStatsProcessor::new(
self.stats_concentrator.clone(),
));
let trace_state = TraceState {
config: Arc::clone(&self.config),
trace_sender: Arc::new(SendingTraceProcessor {
appsec: self.appsec_processor.clone(),
processor: Arc::clone(&self.trace_processor),
trace_tx: self.tx.clone(),
stats_sender,
}),
stats_sender: Arc::new(SendingTraceStatsProcessor::new(
self.stats_concentrator.clone(),
)),
invocation_processor: Arc::clone(&self.invocation_processor),
tags_provider: Arc::clone(&self.tags_provider),
};
Expand Down Expand Up @@ -277,7 +277,6 @@ impl TraceAgent {
state.config,
request,
state.trace_sender,
state.stats_sender,
state.invocation_processor,
state.tags_provider,
ApiVersion::V04,
Expand All @@ -290,7 +289,6 @@ impl TraceAgent {
state.config,
request,
state.trace_sender,
state.stats_sender,
state.invocation_processor,
state.tags_provider,
ApiVersion::V05,
Expand Down Expand Up @@ -430,7 +428,6 @@ impl TraceAgent {
config: Arc<config::Config>,
request: Request,
trace_sender: Arc<SendingTraceProcessor>,
stats_sender: Arc<SendingTraceStatsProcessor>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
version: ApiVersion,
Expand Down Expand Up @@ -523,15 +520,6 @@ impl TraceAgent {
}
}

if config.compute_trace_stats {
if let Err(err) = stats_sender.send(&traces) {
return error_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error sending stats to the stats aggregator: {err}"),
);
}
}

Comment on lines -526 to -534
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this into send_processed_traces() below

if let Err(err) = trace_sender
.send_processed_traces(
config,
Expand All @@ -545,7 +533,7 @@ impl TraceAgent {
{
return error_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error sending traces to the trace aggregator: {err}"),
format!("Error sending traces to the trace aggregator: {err:?}"),
);
}

Expand Down
Loading
Loading