Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ mod tests {

let uptime_offset = Some(50.0);
let (tmp_chan_tx, _) = watch::channel(());
let (process_chan_tx, _) = watch::channel(());

let enhanced_metric_data = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
process_chan_tx,
});

buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone());
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,17 @@ impl Processor {
let (tmp_chan_tx, tmp_chan_rx) = watch::channel(());
self.enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx);

// Start a channel for monitoring file descriptor and thread count
let (process_chan_tx, process_chan_rx) = watch::channel(());
self.enhanced_metrics
.set_process_enhanced_metrics(process_chan_rx);

let enhanced_metric_offsets = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
process_chan_tx,
});
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
Expand Down Expand Up @@ -196,6 +202,8 @@ impl Processor {
);
// Send the signal to stop monitoring tmp
_ = offsets.tmp_chan_tx.send(());
// Send the signal to stop monitoring file descriptors and threads
_ = offsets.process_chan_tx.send(());
}
}

Expand Down
4 changes: 4 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utiliz
pub const TMP_MAX_METRIC: &str = "aws.lambda.enhanced.tmp_max";
pub const TMP_USED_METRIC: &str = "aws.lambda.enhanced.tmp_used";
pub const TMP_FREE_METRIC: &str = "aws.lambda.enhanced.tmp_free";
pub const FD_MAX_METRIC: &str = "aws.lambda.enhanced.fd_max";
pub const FD_USE_METRIC: &str = "aws.lambda.enhanced.fd_use";
pub const THREADS_MAX_METRIC: &str = "aws.lambda.enhanced.threads_max";
pub const THREADS_USE_METRIC: &str = "aws.lambda.enhanced.threads_use";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";
202 changes: 202 additions & 0 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,115 @@ impl Lambda {
});
}

pub fn generate_fd_enhanced_metrics(
fd_max: f64,
fd_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
) {
let metric = Metric::new(
constants::FD_MAX_METRIC.into(),
MetricValue::distribution(fd_max),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert fd_max metric: {}", e);
}

// Check if fd_use value is valid before inserting metric
if fd_use > 0.0 {
let metric = Metric::new(
constants::FD_USE_METRIC.into(),
MetricValue::distribution(fd_use),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert fd_use metric: {}", e);
}
}
}

pub fn generate_threads_enhanced_metrics(
threads_max: f64,
threads_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
) {
let metric = Metric::new(
constants::THREADS_MAX_METRIC.into(),
MetricValue::distribution(threads_max),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert threads_max metric: {}", e);
}

// Check if threads_use value is valid before inserting metric
if threads_use > 0.0 {
let metric = Metric::new(
constants::THREADS_USE_METRIC.into(),
MetricValue::distribution(threads_use),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert threads_use metric: {}", e);
}
}
}

pub fn set_process_enhanced_metrics(&self, mut send_metrics: Receiver<()>) {
if !self.config.enhanced_metrics {
return;
}

let aggr = Arc::clone(&self.aggregator);

tokio::spawn(async move {
// get list of all process ids
let pids = proc::get_pid_list();

// Set fd_max and initial value for fd_use to -1
let fd_max = proc::get_fd_max_data(&pids);
let mut fd_use = -1_f64;

// Set threads_max and initial value for threads_use to -1
let threads_max = proc::get_threads_max_data(&pids);
let mut threads_use = -1_f64;

let mut interval = interval(Duration::from_millis(1));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing this operation every millisecond? Isn't it too much? Could you explain why this low threshold?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to catch the fd count more accurately in case a large number of files are opened and closed very quickly, 2ms also gets pretty close

loop {
tokio::select! {
biased;
// When the stop signal is received, generate final metrics
_ = send_metrics.changed() => {
let mut aggr: std::sync::MutexGuard<Aggregator> =
aggr.lock().expect("lock poisoned");
Self::generate_fd_enhanced_metrics(fd_max, fd_use, &mut aggr);
Comment thread
duncanista marked this conversation as resolved.
Self::generate_threads_enhanced_metrics(threads_max, threads_use, &mut aggr);
return;
}
// Otherwise keep monitoring file descriptor and thread usage periodically
_ = interval.tick() => {
match proc::get_fd_use_data(&pids) {
Ok(fd_use_curr) => {
fd_use = fd_use.max(fd_use_curr);
},
Err(_) => {
debug!("Could not update file descriptor use enhanced metric.");
}
};
match proc::get_threads_use_data(&pids) {
Ok(threads_use_curr) => {
threads_use = threads_use.max(threads_use_curr);
},
Err(_) => {
debug!("Could not update threads use enhanced metric.");
}
};
}
}
}
});
}

fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 {
let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC)
* (memory_size_mb as f64 / constants::MB_TO_GB);
Expand Down Expand Up @@ -503,6 +612,7 @@ pub struct EnhancedMetricData {
pub cpu_offset: Option<CPUData>,
pub uptime_offset: Option<f64>,
pub tmp_chan_tx: Sender<()>,
pub process_chan_tx: Sender<()>,
}

impl PartialEq for EnhancedMetricData {
Expand Down Expand Up @@ -669,6 +779,18 @@ mod tests {
assert!(aggr
.get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::FD_MAX_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::FD_USE_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::THREADS_MAX_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None)
.is_none());
}

#[test]
Expand Down Expand Up @@ -818,4 +940,84 @@ mod tests {
assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0);
assert_sketch(&metrics_aggr, constants::TMP_FREE_METRIC, 538296320.0);
}

#[test]
fn test_set_fd_enhanced_metrics_valid_fd_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = 175.0;

Lambda::generate_fd_enhanced_metrics(
fd_max,
fd_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0);
}

#[test]
fn test_set_fd_enhanced_metrics_invalid_fd_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = -1.0;

Lambda::generate_fd_enhanced_metrics(
fd_max,
fd_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::FD_USE_METRIC.into(), &None)
.is_none());
}

#[test]
fn test_set_threads_enhanced_metrics_valid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = 40.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0);
}

#[test]
fn test_set_threads_enhanced_metrics_invalid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = -1.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None)
.is_none());
}
}
3 changes: 3 additions & 0 deletions bottlecap/src/proc/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
pub const PROC_STAT_PATH: &str = "/proc/stat";
pub const PROC_UPTIME_PATH: &str = "/proc/uptime";
pub const PROC_PATH: &str = "/proc";

pub const LAMDBA_NETWORK_INTERFACE: &str = "vinternal_1";
pub const LAMBDA_FILE_DESCRIPTORS_DEFAULT_LIMIT: f64 = 1024.0;
pub const LAMBDA_EXECUTION_PROCESSES_DEFAULT_LIMIT: f64 = 1024.0;
Loading