Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ figment = { version = "0.10", default-features = false, features = ["yaml", "env
hyper = { version = "0.14", default-features = false, features = ["server"] }
lazy_static = { version = "1.5", default-features = false }
log = { version = "0.4", default-features = false }
nix = { version = "0.26", default-features = false, features = ["feature"] }
protobuf = { version = "3.5", default-features = false }
regex = { version = "1.10", default-features = false }
reqwest = { version = "0.12", features = ["json", "http2", "rustls-tls"], default-features = false }
Expand Down
62 changes: 62 additions & 0 deletions bottlecap/LICENSE-3rdparty.yml

Large diffs are not rendered by default.

21 changes: 15 additions & 6 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,9 @@ async fn extension_loop_active(
..
} => {
let mut p = invocation_processor.lock().await;
let mut enhanced_metric_data = None;
if let Some(metrics) = metrics {
p.on_platform_runtime_done(
enhanced_metric_data = p.on_platform_runtime_done(
&request_id,
metrics.duration_ms,
config.clone(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these stuff like configs and tags provider could be set once at initialization of the invocation_processor, rather than cloned continuously

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I'll be doing some refactoring in my next PR, so I can look into that then

Expand All @@ -450,6 +451,11 @@ async fn extension_loop_active(
request_id, status
);

// set cpu utilization metrics here to avoid accounting for extra idle time
if let Some(offsets) = enhanced_metric_data {
lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(offsets.cpu_offset, offsets.uptime_offset);
}

// TODO(astuyve) it'll be easy to
// pass the invocation deadline to
// flush tasks here, so they can
Expand All @@ -462,6 +468,7 @@ async fn extension_loop_active(
stats_flusher.manual_flush()
);
}

break;
}
TelemetryRecord::PlatformReport {
Expand All @@ -476,11 +483,13 @@ async fn extension_loop_active(
);
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
let mut p = invocation_processor.lock().await;
if let Some((post_runtime_duration_ms, network_offset)) = p.on_platform_report(&request_id, metrics.duration_ms) {
lambda_enhanced_metrics.set_post_runtime_duration_metric(
post_runtime_duration_ms,
);
lambda_enhanced_metrics.set_network_enhanced_metrics(network_offset);
let (post_runtime_duration_ms, enhanced_metric_data) = p.on_platform_report(&request_id, metrics.duration_ms);
if let Some(duration) = post_runtime_duration_ms {
lambda_enhanced_metrics.set_post_runtime_duration_metric(duration);
}
if let Some(offsets) = enhanced_metric_data {
lambda_enhanced_metrics.set_network_enhanced_metrics(offsets.network_offset);
lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(offsets.cpu_offset);
}
drop(p);
Comment on lines +487 to 494
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why are we not doing the some as before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that we were returning None for both post_runtime_duration_ms and enhanced_metric_data if we could not calculate post_runtime_duration_ms because of context.runtime_duration_ms being 0. Changed it to handle so that even if we can't calculate post_runtime_duration_ms, we still try to send enhanced metrics.


Expand Down
45 changes: 35 additions & 10 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::proc::NetworkData;
use crate::metrics::enhanced::lambda::EnhancedMetricData;
use std::collections::VecDeque;

use tracing::debug;
Expand All @@ -9,7 +9,7 @@ pub struct Context {
pub runtime_duration_ms: f64,
pub init_duration_ms: f64,
pub start_time: i64,
pub network_offset: Option<NetworkData>,
pub enhanced_metric_data: Option<EnhancedMetricData>,
}

impl Context {
Expand All @@ -19,14 +19,14 @@ impl Context {
runtime_duration_ms: f64,
init_duration_ms: f64,
start_time: i64,
network_offset: Option<NetworkData>,
enhanced_metric_data: Option<EnhancedMetricData>,
) -> Self {
Context {
request_id,
runtime_duration_ms,
init_duration_ms,
start_time,
network_offset,
enhanced_metric_data,
}
}
}
Expand Down Expand Up @@ -143,13 +143,17 @@ impl ContextBuffer {

/// Adds the network offset to a `Context` in the buffer.
///
pub fn add_network_offset(&mut self, request_id: &String, network_data: Option<NetworkData>) {
pub fn add_enhanced_metric_data(
&mut self,
request_id: &String,
enhanced_metric_data: Option<EnhancedMetricData>,
) {
if let Some(context) = self
.buffer
.iter_mut()
.find(|context| context.request_id == *request_id)
{
context.network_offset = network_data;
context.enhanced_metric_data = enhanced_metric_data;
} else {
debug!("Could not add network offset - context not found");
}
Expand All @@ -166,6 +170,9 @@ impl ContextBuffer {
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use crate::proc::{CPUData, NetworkData};
use std::collections::HashMap;

use super::*;

#[test]
Expand Down Expand Up @@ -286,7 +293,7 @@ mod tests {
}

#[test]
fn test_add_network_offset() {
fn test_add_enhanced_metric_data() {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
Expand All @@ -300,10 +307,28 @@ mod tests {
tx_bytes: 254.0,
});

buffer.add_network_offset(&request_id, network_offset);
assert_eq!(
buffer.get(&request_id).unwrap().network_offset,
let mut individual_cpu_idle_times = HashMap::new();
individual_cpu_idle_times.insert("cpu0".to_string(), 10.0);
individual_cpu_idle_times.insert("cpu1".to_string(), 20.0);
let cpu_offset = Some(CPUData {
total_user_time_ms: 100.0,
total_system_time_ms: 53.0,
total_idle_time_ms: 20.0,
individual_cpu_idle_times: individual_cpu_idle_times,
});

let uptime_offset = Some(50.0);

let enhanced_metric_data = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
});

buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone());
assert_eq!(
buffer.get(&request_id).unwrap().enhanced_metric_data,
enhanced_metric_data,
);
}
}
44 changes: 30 additions & 14 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tracing::debug;
use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer},
proc::{self, NetworkData},
metrics::enhanced::lambda::EnhancedMetricData,
proc::{self, CPUData, NetworkData},
tags::provider,
traces::{
context::SpanContext,
Expand All @@ -34,6 +35,7 @@ pub struct Processor {
propagator: DatadogCompositePropagator,
aws_config: AwsConfig,
tracer_detected: bool,
collect_enhanced_data: bool,
}

impl Processor {
Expand Down Expand Up @@ -73,17 +75,26 @@ impl Processor {
propagator,
aws_config: aws_config.clone(),
tracer_detected: false,
collect_enhanced_data: config.enhanced_metrics,
}
}

/// Given a `request_id`, add the enhanced metric offsets to the context buffer.
/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
self.context_buffer.create_context(request_id.clone());

let network_offset: Option<NetworkData> = proc::get_network_data().ok();
self.context_buffer
.add_network_offset(&request_id, network_offset);
if self.collect_enhanced_data {
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
let uptime_offset: Option<f64> = proc::get_uptime().ok();
let enhanced_metric_offsets = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
});
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
}
}

/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
Expand All @@ -110,10 +121,11 @@ impl Processor {
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_agent_tx: Sender<SendData>,
) {
) -> Option<EnhancedMetricData> {
self.context_buffer
.add_runtime_duration(request_id, duration_ms);

let mut enhanced_metric_data: Option<EnhancedMetricData> = None;
if let Some(context) = self.context_buffer.get(request_id) {
let span = &mut self.span;
// `round` is intentionally meant to be a whole integer
Expand All @@ -129,6 +141,8 @@ impl Processor {
// - error.type
// - error.stack
// - metrics tags (for asm)

enhanced_metric_data.clone_from(&context.enhanced_metric_data);
}

if let Some(trigger_tags) = self.inferrer.get_trigger_tags() {
Expand Down Expand Up @@ -169,6 +183,8 @@ impl Processor {
debug!("Failed to send invocation span to agent: {e}");
}
}

enhanced_metric_data
}

/// Given a `request_id` and the duration in milliseconds of the platform report,
Expand All @@ -181,18 +197,18 @@ impl Processor {
&mut self,
request_id: &String,
duration_ms: f64,
) -> Option<(f64, Option<NetworkData>)> {
) -> (Option<f64>, Option<EnhancedMetricData>) {
if let Some(context) = self.context_buffer.remove(request_id) {
if context.runtime_duration_ms == 0.0 {
return None;
}
let mut post_runtime_duration_ms: Option<f64> = None;

let post_runtime_duration_ms = duration_ms - context.runtime_duration_ms;
if context.runtime_duration_ms != 0.0 {
post_runtime_duration_ms = Some(duration_ms - context.runtime_duration_ms);
}

return Some((post_runtime_duration_ms, context.network_offset));
return (post_runtime_duration_ms, context.enhanced_metric_data);
}

None
(None, None)
}

/// If this method is called, it means that we are operating in a Universally Instrumented
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,13 @@ pub const INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.invocations";
pub const RX_BYTES_METRIC: &str = "aws.lambda.enhanced.rx_bytes";
pub const TX_BYTES_METRIC: &str = "aws.lambda.enhanced.tx_bytes";
pub const TOTAL_NETWORK_METRIC: &str = "aws.lambda.enhanced.total_network";
pub const CPU_SYSTEM_TIME_METRIC: &str = "aws.lambda.enhanced.cpu_system_time";
pub const CPU_USER_TIME_METRIC: &str = "aws.lambda.enhanced.cpu_user_time";
pub const CPU_TOTAL_TIME_METRIC: &str = "aws.lambda.enhanced.cpu_total_time";
pub const CPU_TOTAL_UTILIZATION_PCT_METRIC: &str = "aws.lambda.enhanced.cpu_total_utilization_pct";
pub const CPU_TOTAL_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_total_utilization";
pub const NUM_CORES_METRIC: &str = "aws.lambda.enhanced.num_cores";
pub const CPU_MAX_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_max_utilization";
pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utilization";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";
Loading