diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index bfbaef220..e8e57ab9b 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -20,7 +20,7 @@ figment = { version = "0.10", default-features = false, features = ["yaml", "env hyper = { version = "0.14", default-features = false, features = ["server"] } lazy_static = { version = "1.5", default-features = false } log = { version = "0.4", default-features = false } -nix = { version = "0.26", default-features = false, features = ["feature"] } +nix = { version = "0.26", default-features = false, features = ["feature", "fs"] } protobuf = { version = "3.5", default-features = false } regex = { version = "1.10", default-features = false } reqwest = { version = "0.12", features = ["json", "http2", "rustls-tls"], default-features = false } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 30c2c714b..c72566ef3 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -53,11 +53,9 @@ use dogstatsd::{ use reqwest::Client; use serde::Deserialize; use std::{ - collections::hash_map, - collections::HashMap, + collections::{hash_map, HashMap}, env, - io::Error, - io::Result, + io::{Error, Result}, os::unix::process::CommandExt, path::Path, process::Command, diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index d90a798b9..4cffb929c 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -172,6 +172,7 @@ impl ContextBuffer { mod tests { use crate::proc::{CPUData, NetworkData}; use std::collections::HashMap; + use tokio::sync::watch; use super::*; @@ -318,11 +319,13 @@ mod tests { }); let uptime_offset = Some(50.0); + let (tmp_chan_tx, _) = watch::channel(()); let enhanced_metric_data = Some(EnhancedMetricData { network_offset, cpu_offset, uptime_offset, + tmp_chan_tx, }); buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone()); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 467519e37..1ef99f4b6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -9,7 +9,7 @@ use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; use dogstatsd::aggregator::Aggregator as MetricsAggregator; use serde_json::{json, Value}; -use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc::Sender, watch}; use tracing::debug; use crate::{ @@ -90,13 +90,20 @@ impl Processor { pub fn on_invoke_event(&mut self, request_id: String) { self.context_buffer.create_context(request_id.clone()); if self.enhanced_metrics_enabled { + // Collect offsets for network and cpu metrics let network_offset: Option = proc::get_network_data().ok(); let cpu_offset: Option = proc::get_cpu_data().ok(); let uptime_offset: Option = proc::get_uptime().ok(); + + // Start a channel for monitoring tmp enhanced data + let (tmp_chan_tx, tmp_chan_rx) = watch::channel(()); + self.enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx); + let enhanced_metric_offsets = Some(EnhancedMetricData { network_offset, cpu_offset, uptime_offset, + tmp_chan_tx, }); self.context_buffer .add_enhanced_metric_data(&request_id, enhanced_metric_offsets); @@ -177,6 +184,8 @@ impl Processor { offsets.cpu_offset.clone(), offsets.uptime_offset, ); + // Send the signal to stop monitoring tmp + _ = offsets.tmp_chan_tx.send(()); } } diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 2d17e73ec..f62fd67ea 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -5,6 +5,9 @@ pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4; pub const MS_TO_SEC: f64 = 0.001; pub const MB_TO_GB: f64 = 1_024.0; +// tmp directory path +pub const TMP_PATH: &str = "/tmp/"; + // Enhanced metrics pub const MAX_MEMORY_USED_METRIC: &str = "aws.lambda.enhanced.max_memory_used"; pub const MEMORY_SIZE_METRIC: &str = "aws.lambda.enhanced.memorysize"; @@ -32,5 +35,8 @@ pub const CPU_TOTAL_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_total_ut pub const NUM_CORES_METRIC: &str = "aws.lambda.enhanced.num_cores"; pub const CPU_MAX_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_max_utilization"; pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utilization"; +pub const TMP_MAX_METRIC: &str = "aws.lambda.enhanced.tmp_max"; +pub const TMP_USED_METRIC: &str = "aws.lambda.enhanced.tmp_used"; +pub const TMP_FREE_METRIC: &str = "aws.lambda.enhanced.tmp_free"; //pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations"; pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS"; diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index b11427e96..6e4f53c72 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,4 +1,5 @@ use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; +use super::statfs::statfs_info; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::ReportMetrics; use dogstatsd::aggregator::Aggregator; @@ -6,6 +7,11 @@ use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; use std::env::consts::ARCH; use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::{ + sync::watch::{Receiver, Sender}, + time::interval, +}; use tracing::debug; use tracing::error; @@ -343,6 +349,86 @@ impl Lambda { } } + pub fn generate_tmp_enhanced_metrics( + tmp_max: f64, + tmp_used: f64, + aggr: &mut std::sync::MutexGuard, + ) { + let metric = Metric::new( + constants::TMP_MAX_METRIC.into(), + MetricValue::distribution(tmp_max), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_max metric: {}", e); + } + + let metric = Metric::new( + constants::TMP_USED_METRIC.into(), + MetricValue::distribution(tmp_used), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_used metric: {}", e); + } + + let tmp_free = tmp_max - tmp_used; + let metric = Metric::new( + constants::TMP_FREE_METRIC.into(), + MetricValue::distribution(tmp_free), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("Failed to insert tmp_free metric: {}", e); + } + } + + pub fn set_tmp_enhanced_metrics(&self, mut send_metrics: Receiver<()>) { + if !self.config.enhanced_metrics { + return; + } + + let aggr = Arc::clone(&self.aggregator); + + tokio::spawn(async move { + // Set tmp_max and initial value for tmp_used + let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { + Ok(stats) => stats, + Err(err) => { + debug!("Could not emit tmp enhanced metrics. {:?}", err); + return; + } + }; + let tmp_max = bsize * blocks; + let mut tmp_used = bsize * (blocks - bavail); + + let mut interval = interval(Duration::from_millis(10)); + loop { + tokio::select! { + biased; + // When the stop signal is received, generate final metrics + _ = send_metrics.changed() => { + let mut aggr: std::sync::MutexGuard = + aggr.lock().expect("lock poisoned"); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); + return; + } + // Otherwise keep monitoring tmp usage periodically + _ = interval.tick() => { + let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) { + Ok(stats) => stats, + Err(err) => { + debug!("Could not emit tmp enhanced metrics. {:?}", err); + return; + } + }; + tmp_used = tmp_used.max(bsize * (blocks - bavail)); + } + } + } + }); + } + fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 { let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC) * (memory_size_mb as f64 / constants::MB_TO_GB); @@ -411,11 +497,20 @@ impl Lambda { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct EnhancedMetricData { pub network_offset: Option, pub cpu_offset: Option, pub uptime_offset: Option, + pub tmp_chan_tx: Sender<()>, +} + +impl PartialEq for EnhancedMetricData { + fn eq(&self, other: &Self) -> bool { + self.network_offset == other.network_offset + && self.cpu_offset == other.cpu_offset + && self.uptime_offset == other.uptime_offset + } } #[cfg(test)] @@ -565,6 +660,15 @@ mod tests { assert!(aggr .get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), &None) .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_MAX_METRIC.into(), &None) + .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_USED_METRIC.into(), &None) + .is_none()); + assert!(aggr + .get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None) + .is_none()); } #[test] @@ -695,4 +799,23 @@ mod tests { assert_sketch(&metrics_aggr, constants::CPU_MAX_UTILIZATION_METRIC, 30.0); assert_sketch(&metrics_aggr, constants::CPU_MIN_UTILIZATION_METRIC, 28.75); } + + #[test] + fn test_set_tmp_enhanced_metrics() { + let (metrics_aggr, my_config) = setup(); + let lambda = Lambda::new(metrics_aggr.clone(), my_config); + + let tmp_max = 550461440.0; + let tmp_used = 12165120.0; + + Lambda::generate_tmp_enhanced_metrics( + tmp_max, + tmp_used, + &mut lambda.aggregator.lock().expect("lock poisoned"), + ); + + assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); + assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0); + assert_sketch(&metrics_aggr, constants::TMP_FREE_METRIC, 538296320.0); + } } diff --git a/bottlecap/src/metrics/enhanced/mod.rs b/bottlecap/src/metrics/enhanced/mod.rs index a2638024e..bca7c1bf0 100644 --- a/bottlecap/src/metrics/enhanced/mod.rs +++ b/bottlecap/src/metrics/enhanced/mod.rs @@ -1,2 +1,3 @@ pub mod constants; pub mod lambda; +pub mod statfs; diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs new file mode 100644 index 000000000..0da8a1828 --- /dev/null +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -0,0 +1,25 @@ +#![allow(clippy::module_name_repetitions)] + +use nix::sys::statfs::statfs; +use std::io; +use std::path::Path; + +#[cfg(not(target_os = "windows"))] +/// Returns the block size, total number of blocks, and number of blocks available for the specified directory path. +/// +pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { + let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(( + stat.block_size() as f64, + stat.blocks() as f64, + stat.blocks_available() as f64, + )) +} + +#[cfg(target_os = "windows")] +fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { + Err(io::Error::new( + io::ErrorKind::Other, + "Cannot get tmp data on Windows", + )) +}