From f6040405d6621c9bf8cc1f202ed170df788590e9 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Thu, 7 Nov 2024 18:02:18 -0500 Subject: [PATCH 1/7] generate tmp enhanced metrics --- bottlecap/Cargo.toml | 2 +- bottlecap/src/bin/bottlecap/main.rs | 23 ++-- bottlecap/src/lifecycle/invocation/context.rs | 1 + .../src/lifecycle/invocation/processor.rs | 5 +- bottlecap/src/metrics/enhanced/constants.rs | 6 + bottlecap/src/metrics/enhanced/lambda.rs | 111 +++++++++++++++++- bottlecap/src/metrics/enhanced/mod.rs | 1 + bottlecap/src/metrics/enhanced/statfs.rs | 18 +++ 8 files changed, 153 insertions(+), 14 deletions(-) create mode 100644 bottlecap/src/metrics/enhanced/statfs.rs diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index bfbaef220..e8e57ab9b 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -20,7 +20,7 @@ figment = { version = "0.10", default-features = false, features = ["yaml", "env hyper = { version = "0.14", default-features = false, features = ["server"] } lazy_static = { version = "1.5", default-features = false } log = { version = "0.4", default-features = false } -nix = { version = "0.26", default-features = false, features = ["feature"] } +nix = { version = "0.26", default-features = false, features = ["feature", "fs"] } protobuf = { version = "3.5", default-features = false } regex = { version = "1.10", default-features = false } reqwest = { version = "0.12", features = ["json", "http2", "rustls-tls"], default-features = false } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index dcddf23b5..68dfa70ed 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -54,15 +54,13 @@ use dogstatsd::{ use reqwest::Client; use serde::Deserialize; use std::{ - collections::hash_map, - collections::HashMap, + collections::{hash_map, HashMap}, env, - io::Error, - io::Result, + io::{Error, Result}, os::unix::process::CommandExt, path::Path, process::Command, - sync::{Arc, Mutex}, + sync::{mpsc, Arc, Mutex}, }; use telemetry::listener::TelemetryListenerConfig; use tokio::sync::mpsc::Sender; @@ -375,8 +373,13 @@ async fn extension_loop_active( request_id, deadline_ms, invoked_function_arn ); lambda_enhanced_metrics.increment_invocation_metric(); + + // Start a channel for monitoring tmp enhanced data + let (tmp_chan_tx, tmp_chan_rx) = mpsc::channel::(); + lambda_enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx); + let mut p = invocation_processor.lock().await; - p.on_invoke_event(request_id); + p.on_invoke_event(request_id, Some(tmp_chan_tx)); drop(p); } Ok(NextEventResponse::Shutdown { @@ -487,9 +490,11 @@ async fn extension_loop_active( if let Some(duration) = post_runtime_duration_ms { lambda_enhanced_metrics.set_post_runtime_duration_metric(duration); } - if let Some(offsets) = enhanced_metric_data { - lambda_enhanced_metrics.set_network_enhanced_metrics(offsets.network_offset); - lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(offsets.cpu_offset); + if let Some(data) = enhanced_metric_data { + lambda_enhanced_metrics.set_network_enhanced_metrics(data.network_offset); + lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(data.cpu_offset); + // Drop tmp_chan as a signal to stop monitoring tmp + drop(data.tmp_chan); } drop(p); diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 325fd3cd3..76ff86dd2 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -323,6 +323,7 @@ mod tests { network_offset, cpu_offset, uptime_offset, + tmp_chan: None, }); buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone()); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index e7d2925fc..412f6045a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::Arc, + sync::{mpsc, Arc}, time::{SystemTime, UNIX_EPOCH}, }; @@ -81,7 +81,7 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// - pub fn on_invoke_event(&mut self, request_id: String) { + pub fn on_invoke_event(&mut self, request_id: String, tmp_chan: Option>) { self.context_buffer.create_context(request_id.clone()); if self.collect_enhanced_data { let network_offset: Option = proc::get_network_data().ok(); @@ -91,6 +91,7 @@ impl Processor { network_offset, cpu_offset, uptime_offset, + tmp_chan, }); self.context_buffer .add_enhanced_metric_data(&request_id, enhanced_metric_offsets); diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 2d17e73ec..f62fd67ea 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -5,6 +5,9 @@ pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4; pub const MS_TO_SEC: f64 = 0.001; pub const MB_TO_GB: f64 = 1_024.0; +// tmp directory path +pub const TMP_PATH: &str = "/tmp/"; + // Enhanced metrics pub const MAX_MEMORY_USED_METRIC: &str = "aws.lambda.enhanced.max_memory_used"; pub const MEMORY_SIZE_METRIC: &str = "aws.lambda.enhanced.memorysize"; @@ -32,5 +35,8 @@ pub const CPU_TOTAL_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_total_ut pub const NUM_CORES_METRIC: &str = "aws.lambda.enhanced.num_cores"; pub const CPU_MAX_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_max_utilization"; pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utilization"; +pub const TMP_MAX_METRIC: &str = "aws.lambda.enhanced.tmp_max"; +pub const TMP_USED_METRIC: &str = "aws.lambda.enhanced.tmp_used"; +pub const TMP_FREE_METRIC: &str = "aws.lambda.enhanced.tmp_free"; //pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations"; pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS"; diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 13dd203a9..64c8cb6b9 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,11 +1,17 @@ use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; +use super::statfs::statfs_info; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::ReportMetrics; use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; use std::env::consts::ARCH; -use std::sync::{Arc, Mutex}; +use std::sync::{ + mpsc::{self, Receiver}, + Arc, Mutex, +}; +use std::thread; +use std::time::Duration; use tracing::debug; use tracing::error; @@ -343,6 +349,83 @@ impl Lambda { } } + pub fn generate_tmp_enhanced_metrics( + tmp_max: f64, + tmp_used: f64, + aggr: &mut std::sync::MutexGuard, + ) { + let metric = Metric::new( + constants::TMP_MAX_METRIC.into(), + MetricValue::distribution(tmp_max), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_max metric: {}", e); + } + + let metric = Metric::new( + constants::TMP_USED_METRIC.into(), + MetricValue::distribution(tmp_used), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_used metric: {}", e); + } + + let tmp_free = tmp_max - tmp_used; + let metric = Metric::new( + constants::TMP_FREE_METRIC.into(), + MetricValue::distribution(tmp_free), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_free metric: {}", e); + } + } + + pub fn set_tmp_enhanced_metrics(&self, send_metrics: Receiver) { + if !self.config.enhanced_metrics { + return; + } + + let aggr = Arc::clone(&self.aggregator); + + thread::spawn(move || { + let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { + Ok(stats) => stats, + Err(err) => { + debug!("Could not emit tmp enhanced metrics. {:?}", err); + return; + } + }; + + let tmp_max = bsize * blocks; + let mut tmp_used = bsize * (blocks - bavail); + + loop { + match send_metrics.try_recv() { + Err(mpsc::TryRecvError::Disconnected) => { + let mut aggr: std::sync::MutexGuard = + aggr.lock().expect("lock poisoned"); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); + return; + } + _ => { + let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { + Ok(stats) => stats, + Err(err) => { + debug!("Could not emit tmp enhanced metrics. {:?}", err); + return; + } + }; + tmp_used = tmp_used.max(bsize * (blocks - bavail)); + thread::sleep(Duration::from_millis(10)); + } + } + } + }); + } + fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 { let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC) * (memory_size_mb as f64 / constants::MB_TO_GB); @@ -411,11 +494,20 @@ impl Lambda { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct EnhancedMetricData { pub network_offset: Option, pub cpu_offset: Option, pub uptime_offset: Option, + pub tmp_chan: Option>, +} + +impl PartialEq for EnhancedMetricData { + fn eq(&self, other: &Self) -> bool { + self.network_offset == other.network_offset + && self.cpu_offset == other.cpu_offset + && self.uptime_offset == other.uptime_offset + } } #[cfg(test)] @@ -695,4 +787,19 @@ mod tests { assert_sketch(&metrics_aggr, constants::CPU_MAX_UTILIZATION_METRIC, 30.0); assert_sketch(&metrics_aggr, constants::CPU_MIN_UTILIZATION_METRIC, 28.75); } + + #[test] + fn test_set_tmp_enhanced_metrics() { + let (metrics_aggr, my_config) = setup(); + let lambda = Lambda::new(metrics_aggr.clone(), my_config); + + let tmp_max = 550461440.0; + let tmp_used = 12165120.0; + + Lambda::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut lambda.aggregator.lock().expect("lock poisoned")); + + assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); + assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0); + assert_sketch(&metrics_aggr, constants::TMP_FREE_METRIC, 538296320.0); + } } diff --git a/bottlecap/src/metrics/enhanced/mod.rs b/bottlecap/src/metrics/enhanced/mod.rs index a2638024e..bca7c1bf0 100644 --- a/bottlecap/src/metrics/enhanced/mod.rs +++ b/bottlecap/src/metrics/enhanced/mod.rs @@ -1,2 +1,3 @@ pub mod constants; pub mod lambda; +pub mod statfs; diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs new file mode 100644 index 000000000..858f924cd --- /dev/null +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -0,0 +1,18 @@ +use nix::sys::statfs::statfs; +use std::io; +use std::path::Path; + +#[cfg(not(target_os = "windows"))] +pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { + let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(( + stat.block_size() as f64, + stat.blocks() as f64, + stat.blocks_available() as f64, + )) +} + +#[cfg(target_os = "windows")] +fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { + Ok((0, 0, 0)) +} From a5302122743ba18e7505958648816b5c038b3aed Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 8 Nov 2024 16:57:22 -0500 Subject: [PATCH 2/7] fix channel stop signal --- bottlecap/src/bin/bottlecap/main.rs | 11 +++++++---- bottlecap/src/metrics/enhanced/lambda.rs | 8 ++++++-- bottlecap/src/metrics/enhanced/statfs.rs | 2 ++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 68dfa70ed..6f762a740 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -455,8 +455,13 @@ async fn extension_loop_active( ); // set cpu utilization metrics here to avoid accounting for extra idle time - if let Some(offsets) = enhanced_metric_data { - lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(offsets.cpu_offset, offsets.uptime_offset); + if let Some(mut data) = enhanced_metric_data { + lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(data.cpu_offset, data.uptime_offset); + // Drop tmp_chan as a signal to stop monitoring tmp + if let Some(tmp_chan) = data.tmp_chan.take() { + _ = tmp_chan.send(false); + drop(tmp_chan); + } } // TODO(astuyve) it'll be easy to @@ -493,8 +498,6 @@ async fn extension_loop_active( if let Some(data) = enhanced_metric_data { lambda_enhanced_metrics.set_network_enhanced_metrics(data.network_offset); lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(data.cpu_offset); - // Drop tmp_chan as a signal to stop monitoring tmp - drop(data.tmp_chan); } drop(p); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 64c8cb6b9..596462c7f 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -404,7 +404,7 @@ impl Lambda { loop { match send_metrics.try_recv() { - Err(mpsc::TryRecvError::Disconnected) => { + Ok(false) | Err(mpsc::TryRecvError::Disconnected) => { let mut aggr: std::sync::MutexGuard = aggr.lock().expect("lock poisoned"); Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); @@ -796,7 +796,11 @@ mod tests { let tmp_max = 550461440.0; let tmp_used = 12165120.0; - Lambda::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut lambda.aggregator.lock().expect("lock poisoned")); + Lambda::generate_tmp_enhanced_metrics( + tmp_max, + tmp_used, + &mut lambda.aggregator.lock().expect("lock poisoned"), + ); assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0); diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs index 858f924cd..78c29aedb 100644 --- a/bottlecap/src/metrics/enhanced/statfs.rs +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -1,3 +1,5 @@ +#![allow(clippy::module_name_repetitions)] + use nix::sys::statfs::statfs; use std::io; use std::path::Path; From 17e1c4002cfd48d29786b9efd24594d9cdfc7091 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 11 Nov 2024 13:32:56 -0500 Subject: [PATCH 3/7] use tokio async task instead of thread --- bottlecap/src/bin/bottlecap/main.rs | 10 ++--- .../src/lifecycle/invocation/processor.rs | 6 +-- bottlecap/src/metrics/enhanced/lambda.rs | 38 ++++++++++++------- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 6f762a740..155c95268 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -60,11 +60,11 @@ use std::{ os::unix::process::CommandExt, path::Path, process::Command, - sync::{mpsc, Arc, Mutex}, + sync::{Arc, Mutex}, }; use telemetry::listener::TelemetryListenerConfig; -use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; +use tokio::sync::{mpsc::Sender, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use tracing_subscriber::EnvFilter; @@ -375,7 +375,7 @@ async fn extension_loop_active( lambda_enhanced_metrics.increment_invocation_metric(); // Start a channel for monitoring tmp enhanced data - let (tmp_chan_tx, tmp_chan_rx) = mpsc::channel::(); + let (tmp_chan_tx, tmp_chan_rx) = watch::channel(()); lambda_enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx); let mut p = invocation_processor.lock().await; @@ -457,9 +457,9 @@ async fn extension_loop_active( // set cpu utilization metrics here to avoid accounting for extra idle time if let Some(mut data) = enhanced_metric_data { lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(data.cpu_offset, data.uptime_offset); - // Drop tmp_chan as a signal to stop monitoring tmp + // Send signal to stop monitoring tmp if let Some(tmp_chan) = data.tmp_chan.take() { - _ = tmp_chan.send(false); + _ = tmp_chan.send(()); drop(tmp_chan); } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 412f6045a..3f1b372e1 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::{mpsc, Arc}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -8,7 +8,7 @@ use chrono::{DateTime, Utc}; use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; use serde_json::{json, Value}; -use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc::Sender, watch}; use tracing::debug; use crate::{ @@ -81,7 +81,7 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// - pub fn on_invoke_event(&mut self, request_id: String, tmp_chan: Option>) { + pub fn on_invoke_event(&mut self, request_id: String, tmp_chan: Option>) { self.context_buffer.create_context(request_id.clone()); if self.collect_enhanced_data { let network_offset: Option = proc::get_network_data().ok(); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 596462c7f..6d2399cc4 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -6,12 +6,12 @@ use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; use std::env::consts::ARCH; -use std::sync::{ - mpsc::{self, Receiver}, - Arc, Mutex, -}; -use std::thread; +use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::{ + sync::watch::{Receiver, Sender}, + time::interval, +}; use tracing::debug; use tracing::error; @@ -383,14 +383,15 @@ impl Lambda { } } - pub fn set_tmp_enhanced_metrics(&self, send_metrics: Receiver) { + pub fn set_tmp_enhanced_metrics(&self, mut send_metrics: Receiver<()>) { if !self.config.enhanced_metrics { return; } let aggr = Arc::clone(&self.aggregator); - thread::spawn(move || { + tokio::spawn(async move { + // Set tmp_max and initial value for tmp_used let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { Ok(stats) => stats, Err(err) => { @@ -398,19 +399,22 @@ impl Lambda { return; } }; - let tmp_max = bsize * blocks; let mut tmp_used = bsize * (blocks - bavail); + let mut interval = interval(Duration::from_millis(10)); loop { - match send_metrics.try_recv() { - Ok(false) | Err(mpsc::TryRecvError::Disconnected) => { + tokio::select! { + biased; + // When the stop signal is received, generate final metrics + _ = send_metrics.changed() => { let mut aggr: std::sync::MutexGuard = aggr.lock().expect("lock poisoned"); Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); return; } - _ => { + // Otherwise keep monitoring tmp usage periodically + _ = interval.tick() => { let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { Ok(stats) => stats, Err(err) => { @@ -419,7 +423,6 @@ impl Lambda { } }; tmp_used = tmp_used.max(bsize * (blocks - bavail)); - thread::sleep(Duration::from_millis(10)); } } } @@ -499,7 +502,7 @@ pub struct EnhancedMetricData { pub network_offset: Option, pub cpu_offset: Option, pub uptime_offset: Option, - pub tmp_chan: Option>, + pub tmp_chan: Option>, } impl PartialEq for EnhancedMetricData { @@ -657,6 +660,15 @@ mod tests { assert!(aggr .get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), &None) .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_MAX_METRIC.into(), &None) + .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_USED_METRIC.into(), &None) + .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None) + .is_none()); } #[test] From 5295a70db39a653864877c541e28095e5894cf30 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 11 Nov 2024 13:49:08 -0500 Subject: [PATCH 4/7] statfs fix --- bottlecap/src/metrics/enhanced/statfs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs index 78c29aedb..cd8b5a6b3 100644 --- a/bottlecap/src/metrics/enhanced/statfs.rs +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -16,5 +16,5 @@ pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { #[cfg(target_os = "windows")] fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { - Ok((0, 0, 0)) + Err(io::Error::new(io::ErrorKind::Other, "Cannot get tmp data on Windows")) } From ce8bb1833ba0a9744f466e4ca82b17f0cdcb057f Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 11 Nov 2024 15:00:42 -0500 Subject: [PATCH 5/7] fixes --- bottlecap/src/bin/bottlecap/main.rs | 10 ++++------ bottlecap/src/lifecycle/invocation/context.rs | 4 +++- bottlecap/src/lifecycle/invocation/processor.rs | 2 +- bottlecap/src/metrics/enhanced/lambda.rs | 2 +- bottlecap/src/metrics/enhanced/statfs.rs | 7 ++++++- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 155c95268..a0336f203 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -379,7 +379,7 @@ async fn extension_loop_active( lambda_enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx); let mut p = invocation_processor.lock().await; - p.on_invoke_event(request_id, Some(tmp_chan_tx)); + p.on_invoke_event(request_id, tmp_chan_tx); drop(p); } Ok(NextEventResponse::Shutdown { @@ -455,13 +455,11 @@ async fn extension_loop_active( ); // set cpu utilization metrics here to avoid accounting for extra idle time - if let Some(mut data) = enhanced_metric_data { + if let Some(data) = enhanced_metric_data { lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(data.cpu_offset, data.uptime_offset); // Send signal to stop monitoring tmp - if let Some(tmp_chan) = data.tmp_chan.take() { - _ = tmp_chan.send(()); - drop(tmp_chan); - } + _ = data.tmp_chan.send(()); + drop(data.tmp_chan); } // TODO(astuyve) it'll be easy to diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 76ff86dd2..90db8504a 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -172,6 +172,7 @@ impl ContextBuffer { mod tests { use crate::proc::{CPUData, NetworkData}; use std::collections::HashMap; + use tokio::sync::watch; use super::*; @@ -318,12 +319,13 @@ mod tests { }); let uptime_offset = Some(50.0); + let (tmp_chan, _) = watch::channel(()); let enhanced_metric_data = Some(EnhancedMetricData { network_offset, cpu_offset, uptime_offset, - tmp_chan: None, + tmp_chan, }); buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone()); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 3f1b372e1..dfc91f5f6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -81,7 +81,7 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// - pub fn on_invoke_event(&mut self, request_id: String, tmp_chan: Option>) { + pub fn on_invoke_event(&mut self, request_id: String, tmp_chan: watch::Sender<()>) { self.context_buffer.create_context(request_id.clone()); if self.collect_enhanced_data { let network_offset: Option = proc::get_network_data().ok(); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 6d2399cc4..c31ba08dc 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -502,7 +502,7 @@ pub struct EnhancedMetricData { pub network_offset: Option, pub cpu_offset: Option, pub uptime_offset: Option, - pub tmp_chan: Option>, + pub tmp_chan: Sender<()>, } impl PartialEq for EnhancedMetricData { diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs index cd8b5a6b3..0da8a1828 100644 --- a/bottlecap/src/metrics/enhanced/statfs.rs +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -5,6 +5,8 @@ use std::io; use std::path::Path; #[cfg(not(target_os = "windows"))] +/// Returns the block size, total number of blocks, and number of blocks available for the specified directory path. +/// pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; Ok(( @@ -16,5 +18,8 @@ pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { #[cfg(target_os = "windows")] fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { - Err(io::Error::new(io::ErrorKind::Other, "Cannot get tmp data on Windows")) + Err(io::Error::new( + io::ErrorKind::Other, + "Cannot get tmp data on Windows", + )) } From 4d63dcc12d0352bb3bb4024229803419cb2464e6 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Mon, 11 Nov 2024 16:09:58 -0500 Subject: [PATCH 6/7] remove unused import --- bottlecap/src/bin/bottlecap/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index e51163f82..c72566ef3 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -62,8 +62,8 @@ use std::{ sync::{Arc, Mutex}, }; use telemetry::listener::TelemetryListenerConfig; +use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; -use tokio::sync::{mpsc::Sender, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use tracing_subscriber::EnvFilter; From b9fe23af1ffd0bc6976d6abe972fccb46d251d83 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 12 Nov 2024 13:10:17 -0500 Subject: [PATCH 7/7] rename tmp_chan to tmp_chan_tx --- bottlecap/src/lifecycle/invocation/context.rs | 4 ++-- bottlecap/src/lifecycle/invocation/processor.rs | 4 ++-- bottlecap/src/metrics/enhanced/lambda.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 725f53eb8..4cffb929c 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -319,13 +319,13 @@ mod tests { }); let uptime_offset = Some(50.0); - let (tmp_chan, _) = watch::channel(()); + let (tmp_chan_tx, _) = watch::channel(()); let enhanced_metric_data = Some(EnhancedMetricData { network_offset, cpu_offset, uptime_offset, - tmp_chan, + tmp_chan_tx, }); buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone()); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 3aa6d32bc..1ef99f4b6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -103,7 +103,7 @@ impl Processor { network_offset, cpu_offset, uptime_offset, - tmp_chan: tmp_chan_tx, + tmp_chan_tx, }); self.context_buffer .add_enhanced_metric_data(&request_id, enhanced_metric_offsets); @@ -185,7 +185,7 @@ impl Processor { offsets.uptime_offset, ); // Send the signal to stop monitoring tmp - _ = offsets.tmp_chan.send(()); + _ = offsets.tmp_chan_tx.send(()); } } diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 45f245539..6e4f53c72 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -502,7 +502,7 @@ pub struct EnhancedMetricData { pub network_offset: Option, pub cpu_offset: Option, pub uptime_offset: Option, - pub tmp_chan: Sender<()>, + pub tmp_chan_tx: Sender<()>, } impl PartialEq for EnhancedMetricData {