Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2097d3d
Add debug log
lym953 Aug 19, 2025
6c6532a
Send dummy stats
lym953 Aug 19, 2025
338273a
...
lym953 Sep 2, 2025
0af0f8f
Add more fields
lym953 Sep 3, 2025
cb2e790
Can see metrics in metrics explorer
lym953 Sep 8, 2025
102cbbe
Add stats concentrator, which pushes data to aggregator
lym953 Sep 8, 2025
82d692b
Avoid returning unused trace_tx
lym953 Sep 8, 2025
0589583
Move start_stats_agent() inside start_stats_agent()
lym953 Sep 8, 2025
1c7bb7c
Make stats aggregator pull from stats concentrator
lym953 Sep 8, 2025
03e88cc
Add logging
lym953 Sep 9, 2025
e06b1e7
Fix double counting
lym953 Sep 9, 2025
92ba7b2
Move ClientStatsPayload construction to StatsConcentrator
lym953 Sep 9, 2025
853e955
Create buckets
lym953 Sep 9, 2025
b383773
Do not flush the latest two buckets
lym953 Sep 10, 2025
c69adb6
Do not use hard coded keys such as yiming_name
lym953 Sep 10, 2025
13c2aef
Change _dd.compute_stats from 1 to 0
lym953 Sep 10, 2025
609294c
Remove MyStatsProcessor
lym953 Sep 10, 2025
39766d6
Remove unused code
lym953 Sep 10, 2025
503c07d
Format
lym953 Sep 10, 2025
ce2879c
Rename variables and remove unnecessary code
lym953 Sep 10, 2025
70f5e07
Fix code style
lym953 Sep 10, 2025
029a556
Code style
lym953 Sep 10, 2025
abf9372
Add comments
lym953 Sep 10, 2025
815e740
Add tests for should_flush_bucket()
lym953 Sep 10, 2025
c5770e3
Format
lym953 Sep 10, 2025
aa7073c
Move the trigger to trace agent, without grouping by key
lym953 Sep 12, 2025
01441c3
Support aggregation keys
lym953 Sep 12, 2025
847ec5d
Support duration
lym953 Sep 12, 2025
8b103ee
Support errors
lym953 Sep 12, 2025
ca42a2c
Support duration
lym953 Sep 15, 2025
efe580c
Change http status code from 200 to 0
lym953 Sep 15, 2025
2687e00
Remove unused resource param
lym953 Sep 15, 2025
c39a4a0
Get service from trace
lym953 Sep 15, 2025
d0fa419
Get env from trace
lym953 Sep 15, 2025
f298a2e
Support r#type
lym953 Sep 15, 2025
2793e41
Add comments
lym953 Sep 15, 2025
e8ca422
Use retain()
lym953 Sep 15, 2025
45c65d9
Handle error when casting u128 to u64
lym953 Sep 15, 2025
6a9a16a
Fix the support for error and duration
lym953 Sep 15, 2025
cc0e252
Support top_level_hits
lym953 Sep 15, 2025
7026753
Add feature flag
lym953 Sep 16, 2025
d0f55e8
Add stats concentrator service to avoid using mutex
lym953 Sep 16, 2025
9f58ddb
Remove some debug log
lym953 Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use bottlecap::{
proxy_aggregator,
proxy_flusher::Flusher as ProxyFlusher,
stats_aggregator::StatsAggregator,
stats_concentrator_service::StatsConcentratorService,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent,
trace_aggregator::{self, SendDataBuilderInfo},
Expand Down Expand Up @@ -629,6 +630,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle.clone(),
false,
)
.await;
}
Expand All @@ -644,6 +646,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle.clone(),
false,
)
.await;
let next_response = next_event(client, &r.extension_id).await;
Expand Down Expand Up @@ -721,6 +724,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
false, // force_flush_trace_stats
)
.await;
last_continuous_flush_error = false;
Expand Down Expand Up @@ -761,6 +765,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
false, // force_flush_trace_stats
)
.await;
}
Expand Down Expand Up @@ -818,13 +823,15 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
true, // force_flush_trace_stats
)
.await;
return Ok(());
}
}
}

#[allow(clippy::too_many_arguments)]
async fn blocking_flush_all(
logs_flusher: &LogsFlusher,
metrics_flushers: &mut [MetricsFlusher],
Expand All @@ -833,6 +840,7 @@ async fn blocking_flush_all(
proxy_flusher: &ProxyFlusher,
race_flush_interval: &mut tokio::time::Interval,
metrics_aggr_handle: &MetricsAggregatorHandle,
force_flush_trace_stats: bool,
) {
let flush_response = metrics_aggr_handle
.flush()
Expand All @@ -852,7 +860,7 @@ async fn blocking_flush_all(
logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
trace_flusher.flush(None),
stats_flusher.flush(),
stats_flusher.flush(force_flush_trace_stats),
proxy_flusher.flush(None),
);
race_flush_interval.reset();
Expand Down Expand Up @@ -1100,7 +1108,12 @@ fn start_trace_agent(
tokio_util::sync::CancellationToken,
) {
// Stats
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
let (stats_concentrator_service, stats_concentrator_handle) =
StatsConcentratorService::new(Arc::clone(config));
tokio::spawn(stats_concentrator_service.run());
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
api_key_factory.clone(),
stats_aggregator.clone(),
Expand Down Expand Up @@ -1142,12 +1155,13 @@ fn start_trace_agent(
Arc::clone(config),
trace_aggregator,
trace_processor.clone(),
stats_aggregator,
stats_aggregator.clone(),
stats_processor,
proxy_aggregator,
invocation_processor,
appsec_processor,
Arc::clone(tags_provider),
stats_concentrator_handle.clone(),
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand Down
6 changes: 6 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ pub struct EnvConfig {
/// The maximum depth of the Lambda payload to capture.
/// Default is `10`. Requires `capture_lambda_payload` to be `true`.
pub capture_lambda_payload_max_depth: Option<u32>,
/// @env `DD_COMPUTE_TRACE_STATS`
///
/// Enable computation of trace stats for AWS Lambda.
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub compute_trace_stats: Option<bool>,
/// @env `DD_SERVERLESS_APPSEC_ENABLED`
///
/// Enable Application and API Protection (AAP), previously known as AppSec/ASM, for AWS Lambda.
Expand Down Expand Up @@ -513,6 +518,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option_to_value!(config, env_config, lambda_proc_enhanced_metrics);
merge_option_to_value!(config, env_config, capture_lambda_payload);
merge_option_to_value!(config, env_config, capture_lambda_payload_max_depth);
merge_option_to_value!(config, env_config, compute_trace_stats);
merge_option_to_value!(config, env_config, serverless_appsec_enabled);
merge_option!(config, env_config, appsec_rules);
merge_option_to_value!(config, env_config, appsec_waf_timeout);
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ pub struct Config {
pub lambda_proc_enhanced_metrics: bool,
pub capture_lambda_payload: bool,
pub capture_lambda_payload_max_depth: u32,
pub compute_trace_stats: bool,

pub serverless_appsec_enabled: bool,
pub appsec_rules: Option<String>,
Expand Down Expand Up @@ -429,6 +430,7 @@ impl Default for Config {
lambda_proc_enhanced_metrics: true,
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
compute_trace_stats: false,

serverless_appsec_enabled: false,
appsec_rules: None,
Expand Down
3 changes: 3 additions & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct YamlConfig {
pub capture_lambda_payload: Option<bool>,
pub capture_lambda_payload_max_depth: Option<u32>,
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub compute_trace_stats: Option<bool>,
#[serde(deserialize_with = "deserialize_optional_bool_from_anything")]
pub serverless_appsec_enabled: Option<bool>,
pub appsec_rules: Option<String>,
#[serde(deserialize_with = "deserialize_optional_duration_from_microseconds")]
Expand Down Expand Up @@ -613,6 +615,7 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) {
merge_option_to_value!(config, yaml_config, lambda_proc_enhanced_metrics);
merge_option_to_value!(config, yaml_config, capture_lambda_payload);
merge_option_to_value!(config, yaml_config, capture_lambda_payload_max_depth);
merge_option_to_value!(config, yaml_config, compute_trace_stats);
merge_option_to_value!(config, yaml_config, serverless_appsec_enabled);
merge_option!(config, yaml_config, appsec_rules);
merge_option_to_value!(config, yaml_config, appsec_waf_timeout);
Expand Down
16 changes: 10 additions & 6 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::lifecycle::invocation::triggers::get_default_service_name;
pub const MS_TO_NS: f64 = 1_000_000.0;
pub const S_TO_MS: u64 = 1_000;
pub const S_TO_NS: f64 = 1_000_000_000.0;
pub const S_TO_NS_U64: u64 = 1_000_000_000;
pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000;

pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg";
Expand Down Expand Up @@ -127,12 +128,10 @@ impl Processor {
self.context_buffer
.start_context(&request_id, invocation_span);

let timestamp = std::time::UNIX_EPOCH
let timestamp_secs = std::time::UNIX_EPOCH
.elapsed()
.expect("can't poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
.as_secs();

if self.config.lambda_proc_enhanced_metrics {
// Collect offsets for network and cpu metrics
Expand Down Expand Up @@ -161,14 +160,19 @@ impl Processor {
}

// Increment the invocation metric
self.enhanced_metrics.increment_invocation_metric(timestamp);
self.enhanced_metrics
.increment_invocation_metric(timestamp_secs.try_into().unwrap_or_default());
self.enhanced_metrics.set_invoked_received();

// If `UniversalInstrumentationStart` event happened first, process it
if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) {
// Infer span
self.inferrer.infer_span(&payload_value, &self.aws_config);
self.process_on_universal_instrumentation_start(request_id, headers, payload_value);
self.process_on_universal_instrumentation_start(
request_id.clone(),
headers,
payload_value,
);
}
}

Expand Down
11 changes: 5 additions & 6 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const SERVICE_KEY: &str = "service";

// ComputeStatsKey is the tag key indicating whether trace stats should be computed
const COMPUTE_STATS_KEY: &str = "_dd.compute_stats";
// ComputeStatsValue is the tag value indicating trace stats should be computed
const COMPUTE_STATS_VALUE: &str = "1";
// FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload
const FUNCTION_TAGS_KEY: &str = "_dd.tags.function";
// TODO(astuyve) decide what to do with the version
Expand Down Expand Up @@ -122,10 +120,11 @@ fn tags_from_env(
tags_map.extend(config.tags.clone());
}

tags_map.insert(
COMPUTE_STATS_KEY.to_string(),
COMPUTE_STATS_VALUE.to_string(),
);
// "config.compute_trace_stats == true" means computing stats on the extension side,
// so we set _dd.compute_stats to 0 so stats won't be computed on the backend side.
let compute_stats = i32::from(!config.compute_trace_stats);
tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string());

tags_map
}

Expand Down
4 changes: 4 additions & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ pub mod propagation;
pub mod proxy_aggregator;
pub mod proxy_flusher;
pub mod span_pointers;
pub mod stats_agent;
pub mod stats_aggregator;
pub mod stats_concentrator;
pub mod stats_concentrator_service;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_aggregator;
pub mod trace_flusher;
pub mod trace_processor;
pub mod trace_stats_processor;

// URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set.
const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001";
Expand Down
46 changes: 46 additions & 0 deletions bottlecap/src/traces/stats_agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::error;

use super::stats_concentrator_service::StatsConcentratorHandle;

use super::stats_concentrator::AggregationKey;
use super::stats_concentrator::Stats;

#[derive(Clone)]
pub struct StatsEvent {
pub time: u64,
pub aggregation_key: AggregationKey,
pub stats: Stats,
}

#[allow(clippy::module_name_repetitions)]
pub struct StatsAgent {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this own the channel it creates and then return the tx through a public method? that way it's not created in the main binary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will do.

tx: Sender<StatsEvent>,
rx: Receiver<StatsEvent>,
concentrator: StatsConcentratorHandle,
}

impl StatsAgent {
#[must_use]
pub fn new(concentrator: StatsConcentratorHandle) -> StatsAgent {
let (tx, rx) = mpsc::channel::<StatsEvent>(1000);
StatsAgent {
tx,
rx,
concentrator,
}
}

pub async fn spin(&mut self) {
while let Some(event) = self.rx.recv().await {
if let Err(e) = self.concentrator.add(event) {
error!("Error adding stats event to the stats concentrator: {e}");
}
}
}

#[must_use]
pub fn get_sender_copy(&self) -> Sender<StatsEvent> {
self.tx.clone()
}
}
37 changes: 23 additions & 14 deletions bottlecap/src/traces/stats_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::traces::stats_concentrator_service::StatsConcentratorHandle;
use datadog_trace_protobuf::pb::ClientStatsPayload;
use std::collections::VecDeque;
use tracing::error;

#[allow(clippy::empty_line_after_doc_comments)]
/// Maximum number of entries in a stat payload.
Expand All @@ -22,37 +24,44 @@ pub struct StatsAggregator {
queue: VecDeque<ClientStatsPayload>,
max_content_size_bytes: usize,
buffer: Vec<ClientStatsPayload>,
}

impl Default for StatsAggregator {
fn default() -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
buffer: Vec::new(),
}
}
concentrator: StatsConcentratorHandle,
}

/// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog.
impl StatsAggregator {
#[allow(dead_code)]
#[allow(clippy::must_use_candidate)]
pub fn new(max_content_size_bytes: usize) -> Self {
fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes,
buffer: Vec::new(),
concentrator,
}
}

#[must_use]
pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self {
Self::new(MAX_CONTENT_SIZE_BYTES, concentrator)
}

/// Takes in an individual trace stats payload.
pub fn add(&mut self, payload: ClientStatsPayload) {
self.queue.push_back(payload);
}

/// Returns a batch of trace stats payloads, subject to the max content size.
pub fn get_batch(&mut self) -> Vec<ClientStatsPayload> {
pub async fn get_batch(&mut self, force_flush: bool) -> Vec<ClientStatsPayload> {
// Pull stats data from concentrator
match self.concentrator.get_stats(force_flush).await {
Ok(stats) => {
self.queue.extend(stats);
}
Err(e) => {
error!("Error getting stats from the stats concentrator: {e:?}");
}
}

let mut batch_size = 0;

// Fill the batch
Expand Down Expand Up @@ -158,12 +167,12 @@ mod tests {
aggregator.add(payload.clone());

// The batch should only contain the first 2 payloads
let first_batch = aggregator.get_batch();
let first_batch = aggregator.get_batch(false);
assert_eq!(first_batch, vec![payload.clone(), payload.clone()]);
assert_eq!(aggregator.queue.len(), 1);

// The second batch should only contain the last log
let second_batch = aggregator.get_batch();
let second_batch = aggregator.get_batch(false);
assert_eq!(second_batch, vec![payload]);
assert_eq!(aggregator.queue.len(), 0);
}
Expand Down
Loading
Loading