Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ figment = { version = "0.10", default-features = false, features = ["yaml", "env
hyper = { version = "0.14", default-features = false, features = ["server"] }
lazy_static = { version = "1.5", default-features = false }
log = { version = "0.4", default-features = false }
nix = { version = "0.26", default-features = false, features = ["feature"] }
nix = { version = "0.26", default-features = false, features = ["feature", "fs"] }
Comment thread
duncanista marked this conversation as resolved.
protobuf = { version = "3.5", default-features = false }
regex = { version = "1.10", default-features = false }
reqwest = { version = "0.12", features = ["json", "http2", "rustls-tls"], default-features = false }
Expand Down
6 changes: 2 additions & 4 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ use dogstatsd::{
use reqwest::Client;
use serde::Deserialize;
use std::{
collections::hash_map,
collections::HashMap,
collections::{hash_map, HashMap},
env,
io::Error,
io::Result,
io::{Error, Result},
os::unix::process::CommandExt,
path::Path,
process::Command,
Expand Down
3 changes: 3 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ContextBuffer {
mod tests {
use crate::proc::{CPUData, NetworkData};
use std::collections::HashMap;
use tokio::sync::watch;

use super::*;

Expand Down Expand Up @@ -318,11 +319,13 @@ mod tests {
});

let uptime_offset = Some(50.0);
let (tmp_chan_tx, _) = watch::channel(());

let enhanced_metric_data = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
});

buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone());
Expand Down
11 changes: 10 additions & 1 deletion bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::{send_data::SendData, tracer_header_tags};
use dogstatsd::aggregator::Aggregator as MetricsAggregator;
use serde_json::{json, Value};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc::Sender, watch};
use tracing::debug;

use crate::{
Expand Down Expand Up @@ -90,13 +90,20 @@ impl Processor {
pub fn on_invoke_event(&mut self, request_id: String) {
self.context_buffer.create_context(request_id.clone());
if self.enhanced_metrics_enabled {
// Collect offsets for network and cpu metrics
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
let uptime_offset: Option<f64> = proc::get_uptime().ok();

// Start a channel for monitoring tmp enhanced data
let (tmp_chan_tx, tmp_chan_rx) = watch::channel(());
Comment thread
duncanista marked this conversation as resolved.
self.enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx);

let enhanced_metric_offsets = Some(EnhancedMetricData {
network_offset,
cpu_offset,
uptime_offset,
tmp_chan_tx,
});
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
Expand Down Expand Up @@ -177,6 +184,8 @@ impl Processor {
offsets.cpu_offset.clone(),
offsets.uptime_offset,
);
// Send the signal to stop monitoring tmp
_ = offsets.tmp_chan_tx.send(());
}
}

Expand Down
6 changes: 6 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4;
pub const MS_TO_SEC: f64 = 0.001;
pub const MB_TO_GB: f64 = 1_024.0;

// tmp directory path
pub const TMP_PATH: &str = "/tmp/";

// Enhanced metrics
pub const MAX_MEMORY_USED_METRIC: &str = "aws.lambda.enhanced.max_memory_used";
pub const MEMORY_SIZE_METRIC: &str = "aws.lambda.enhanced.memorysize";
Expand Down Expand Up @@ -32,5 +35,8 @@ pub const CPU_TOTAL_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_total_ut
pub const NUM_CORES_METRIC: &str = "aws.lambda.enhanced.num_cores";
pub const CPU_MAX_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_max_utilization";
pub const CPU_MIN_UTILIZATION_METRIC: &str = "aws.lambda.enhanced.cpu_min_utilization";
pub const TMP_MAX_METRIC: &str = "aws.lambda.enhanced.tmp_max";
pub const TMP_USED_METRIC: &str = "aws.lambda.enhanced.tmp_used";
pub const TMP_FREE_METRIC: &str = "aws.lambda.enhanced.tmp_free";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";
125 changes: 124 additions & 1 deletion bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE};
use super::statfs::statfs_info;
use crate::proc::{self, CPUData, NetworkData};
use crate::telemetry::events::ReportMetrics;
use dogstatsd::aggregator::Aggregator;
use dogstatsd::metric;
use dogstatsd::metric::{Metric, MetricValue};
use std::env::consts::ARCH;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::{
sync::watch::{Receiver, Sender},
time::interval,
};
use tracing::debug;
use tracing::error;

Expand Down Expand Up @@ -343,6 +349,86 @@ impl Lambda {
}
}

pub fn generate_tmp_enhanced_metrics(
tmp_max: f64,
tmp_used: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
) {
let metric = Metric::new(
constants::TMP_MAX_METRIC.into(),
MetricValue::distribution(tmp_max),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert tmp_max metric: {}", e);
}

let metric = Metric::new(
constants::TMP_USED_METRIC.into(),
MetricValue::distribution(tmp_used),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert tmp_used metric: {}", e);
}

let tmp_free = tmp_max - tmp_used;
let metric = Metric::new(
constants::TMP_FREE_METRIC.into(),
MetricValue::distribution(tmp_free),
None,
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert tmp_free metric: {}", e);
}
}

pub fn set_tmp_enhanced_metrics(&self, mut send_metrics: Receiver<()>) {
if !self.config.enhanced_metrics {
return;
}

let aggr = Arc::clone(&self.aggregator);

tokio::spawn(async move {
// Set tmp_max and initial value for tmp_used
let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) {
Ok(stats) => stats,
Err(err) => {
debug!("Could not emit tmp enhanced metrics. {:?}", err);
return;
}
};
let tmp_max = bsize * blocks;
let mut tmp_used = bsize * (blocks - bavail);

let mut interval = interval(Duration::from_millis(10));
Comment thread
duncanista marked this conversation as resolved.
loop {
tokio::select! {
Comment thread
duncanista marked this conversation as resolved.
biased;
// When the stop signal is received, generate final metrics
_ = send_metrics.changed() => {
let mut aggr: std::sync::MutexGuard<Aggregator> =
aggr.lock().expect("lock poisoned");
Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr);
return;
Comment thread
shreyamalpani marked this conversation as resolved.
}
// Otherwise keep monitoring tmp usage periodically
_ = interval.tick() => {
let (bsize, blocks, bavail) = match statfs_info(constants::TMP_PATH) {
Ok(stats) => stats,
Err(err) => {
debug!("Could not emit tmp enhanced metrics. {:?}", err);
return;
}
};
tmp_used = tmp_used.max(bsize * (blocks - bavail));
}
}
}
});
}

fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 {
let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC)
* (memory_size_mb as f64 / constants::MB_TO_GB);
Expand Down Expand Up @@ -411,11 +497,20 @@ impl Lambda {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub struct EnhancedMetricData {
pub network_offset: Option<NetworkData>,
pub cpu_offset: Option<CPUData>,
pub uptime_offset: Option<f64>,
pub tmp_chan_tx: Sender<()>,
}

impl PartialEq for EnhancedMetricData {
fn eq(&self, other: &Self) -> bool {
self.network_offset == other.network_offset
&& self.cpu_offset == other.cpu_offset
&& self.uptime_offset == other.uptime_offset
}
}

#[cfg(test)]
Expand Down Expand Up @@ -565,6 +660,15 @@ mod tests {
assert!(aggr
.get_entry_by_id(constants::CPU_MAX_UTILIZATION_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::TMP_MAX_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::TMP_USED_METRIC.into(), &None)
.is_none());
assert!(aggr
.get_entry_by_id(constants::TMP_FREE_METRIC.into(), &None)
.is_none());
}

#[test]
Expand Down Expand Up @@ -695,4 +799,23 @@ mod tests {
assert_sketch(&metrics_aggr, constants::CPU_MAX_UTILIZATION_METRIC, 30.0);
assert_sketch(&metrics_aggr, constants::CPU_MIN_UTILIZATION_METRIC, 28.75);
}

#[test]
fn test_set_tmp_enhanced_metrics() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let tmp_max = 550461440.0;
let tmp_used = 12165120.0;

Lambda::generate_tmp_enhanced_metrics(
tmp_max,
tmp_used,
&mut lambda.aggregator.lock().expect("lock poisoned"),
);

assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0);
assert_sketch(&metrics_aggr, constants::TMP_USED_METRIC, 12165120.0);
assert_sketch(&metrics_aggr, constants::TMP_FREE_METRIC, 538296320.0);
}
}
1 change: 1 addition & 0 deletions bottlecap/src/metrics/enhanced/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod constants;
pub mod lambda;
pub mod statfs;
25 changes: 25 additions & 0 deletions bottlecap/src/metrics/enhanced/statfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#![allow(clippy::module_name_repetitions)]

use nix::sys::statfs::statfs;
use std::io;
use std::path::Path;

#[cfg(not(target_os = "windows"))]
/// Returns the block size, total number of blocks, and number of blocks available for the specified directory path.
///
pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> {
Comment thread
shreyamalpani marked this conversation as resolved.
let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok((
stat.block_size() as f64,
stat.blocks() as f64,
stat.blocks_available() as f64,
))
}

#[cfg(target_os = "windows")]
fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> {
Err(io::Error::new(
io::ErrorKind::Other,
"Cannot get tmp data on Windows",
))
}