From 1ca5f58623a6e8ffd79373bd442f9480358c9ef5 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 22 Oct 2024 17:31:00 -0400 Subject: [PATCH 1/3] send network enhanced metrics --- bottlecap/src/bin/bottlecap/main.rs | 6 +- bottlecap/src/lib.rs | 1 + bottlecap/src/lifecycle/invocation/context.rs | 81 ++++++++++++++--- .../src/lifecycle/invocation/processor.rs | 19 +++- bottlecap/src/logs/lambda/processor.rs | 2 +- bottlecap/src/metrics/enhanced/constants.rs | 3 + bottlecap/src/metrics/enhanced/lambda.rs | 86 +++++++++++++++++++ bottlecap/src/proc/constants.rs | 3 + bottlecap/src/proc/mod.rs | 82 ++++++++++++++++++ .../tests/proc/net/invalid_dev_malformed | 5 ++ .../proc/net/invalid_dev_non_numerical_value | 5 ++ .../tests/proc/net/missing_interface_dev | 4 + bottlecap/tests/proc/net/valid_dev | 5 ++ 13 files changed, 286 insertions(+), 16 deletions(-) create mode 100644 bottlecap/src/proc/constants.rs create mode 100644 bottlecap/src/proc/mod.rs create mode 100644 bottlecap/tests/proc/net/invalid_dev_malformed create mode 100644 bottlecap/tests/proc/net/invalid_dev_non_numerical_value create mode 100644 bottlecap/tests/proc/net/missing_interface_dev create mode 100644 bottlecap/tests/proc/net/valid_dev diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index a343a02dd..28909ad18 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -375,6 +375,9 @@ async fn extension_loop_active( request_id, deadline_ms, invoked_function_arn ); lambda_enhanced_metrics.increment_invocation_metric(); + let mut p = invocation_processor.lock().await; + p.on_invoke_event(request_id); + drop(p); } Ok(NextEventResponse::Shutdown { shutdown_reason, @@ -473,10 +476,11 @@ async fn extension_loop_active( ); lambda_enhanced_metrics.set_report_log_metrics(&metrics); let mut p = invocation_processor.lock().await; - if let Some(post_runtime_duration_ms) = p.on_platform_report(&request_id, metrics.duration_ms) { + if let Some((post_runtime_duration_ms, network_offset)) = p.on_platform_report(&request_id, metrics.duration_ms) { lambda_enhanced_metrics.set_post_runtime_duration_metric( post_runtime_duration_ms, ); + lambda_enhanced_metrics.set_network_enhanced_metrics(network_offset); } drop(p); diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index bedbbe246..58f390db8 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -24,6 +24,7 @@ pub mod lifecycle; pub mod logger; pub mod logs; pub mod metrics; +pub mod proc; pub mod secrets; pub mod tags; pub mod telemetry; diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index a1c74bc57..9c00dea30 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -1,3 +1,4 @@ +use crate::proc::NetworkData; use std::collections::VecDeque; use tracing::debug; @@ -8,6 +9,7 @@ pub struct Context { pub runtime_duration_ms: f64, pub init_duration_ms: f64, pub start_time: i64, + pub network_offset: Option, } impl Context { @@ -17,12 +19,14 @@ impl Context { runtime_duration_ms: f64, init_duration_ms: f64, start_time: i64, + network_offset: Option, ) -> Self { Context { request_id, runtime_duration_ms, init_duration_ms, start_time, + network_offset, } } } @@ -100,7 +104,13 @@ impl ContextBuffer { { context.init_duration_ms = init_duration_ms; } else { - self.insert(Context::new(request_id.clone(), 0.0, init_duration_ms, 0)); + self.insert(Context::new( + request_id.clone(), + 0.0, + init_duration_ms, + 0, + None, + )); } } @@ -115,7 +125,7 @@ impl ContextBuffer { { context.start_time = start_time; } else { - self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time)); + self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time, None)); } } @@ -135,10 +145,26 @@ impl ContextBuffer { runtime_duration_ms, 0.0, 0, + None, )); } } + /// Adds the network offset to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// + pub fn add_network_offset(&mut self, request_id: &String, network_data: Option) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.network_offset = network_data; + } else { + self.insert(Context::new(request_id.clone(), 0.0, 0.0, 0, network_data)); + } + } + /// Returns the size of the buffer. /// #[must_use] @@ -157,20 +183,20 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); let request_id_2 = String::from("2"); - let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 2); assert_eq!(buffer.get(&request_id_2).unwrap(), &context); // This should replace the first context let request_id_3 = String::from("3"); - let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 2); assert_eq!(buffer.get(&request_id_3).unwrap(), &context); @@ -184,13 +210,13 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); let request_id_2 = String::from("2"); - let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 2); assert_eq!(buffer.get(&request_id_2).unwrap(), &context); @@ -211,13 +237,13 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); let request_id_2 = String::from("2"); - let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 2); assert_eq!(buffer.get(&request_id_2).unwrap(), &context); @@ -232,7 +258,7 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); @@ -255,7 +281,7 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); @@ -275,7 +301,7 @@ mod tests { let mut buffer = ContextBuffer::with_capacity(2); let request_id = String::from("1"); - let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); buffer.insert(context.clone()); assert_eq!(buffer.size(), 1); assert_eq!(buffer.get(&request_id).unwrap(), &context); @@ -295,4 +321,35 @@ mod tests { 200.0 ); } + + #[test] + fn test_add_network_offset() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let network_offset = Some(NetworkData { + rx_bytes: 180.0, + tx_bytes: 254.0, + }); + + buffer.add_network_offset(&request_id, network_offset); + assert_eq!( + buffer.get(&request_id).unwrap().network_offset, + network_offset, + ); + + // Add network offset to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_network_offset(&unexistent_request_id, network_offset); + assert_eq!(buffer.size(), 2); + assert_eq!( + buffer.get(&unexistent_request_id).unwrap().network_offset, + network_offset + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 9b9cc98ef..59618f87a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -13,6 +13,7 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer}, + proc::{self, NetworkData}, tags::provider, traces::trace_processor, }; @@ -63,6 +64,14 @@ impl Processor { } } + /// Given a `request_id`, add the enhanced metric offsets to the context buffer. + /// + pub fn on_invoke_event(&mut self, request_id: String) { + let network_offset: Option = proc::get_network_data().ok(); + self.context_buffer + .add_network_offset(&request_id, network_offset); + } + /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. /// /// Also, set the start time of the current span. @@ -151,13 +160,19 @@ impl Processor { /// If the `request_id` is not found in the context buffer, return `None`. /// If the `runtime_duration_ms` hasn't been seen, return `None`. /// - pub fn on_platform_report(&mut self, request_id: &String, duration_ms: f64) -> Option { + pub fn on_platform_report( + &mut self, + request_id: &String, + duration_ms: f64, + ) -> Option<(f64, Option)> { if let Some(context) = self.context_buffer.remove(request_id) { if context.runtime_duration_ms == 0.0 { return None; } - return Some(duration_ms - context.runtime_duration_ms); + let post_runtime_duration_ms = duration_ms - context.runtime_duration_ms; + + return Some((post_runtime_duration_ms, context.network_offset)); } None diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index d3b519682..6de124be6 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -53,7 +53,7 @@ impl LambdaProcessor { service, tags, rules, - invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0), + invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0, None), orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 3c2d34e0a..e82f48057 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -21,5 +21,8 @@ pub const OUT_OF_MEMORY_METRIC: &str = "aws.lambda.enhanced.out_of_memory"; pub const TIMEOUTS_METRIC: &str = "aws.lambda.enhanced.timeouts"; pub const ERRORS_METRIC: &str = "aws.lambda.enhanced.errors"; pub const INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.invocations"; +pub const RX_BYTES_METRIC: &str = "aws.lambda.enhanced.rx_bytes"; +pub const TX_BYTES_METRIC: &str = "aws.lambda.enhanced.tx_bytes"; +pub const TOTAL_NETWORK_METRIC: &str = "aws.lambda.enhanced.total_network"; //pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations"; pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS"; diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 0db917f31..144780102 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,10 +1,12 @@ use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; +use crate::proc::{self, NetworkData}; use crate::telemetry::events::ReportMetrics; use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; use std::env::consts::ARCH; use std::sync::{Arc, Mutex}; +use tracing::debug; use tracing::error; pub struct Lambda { @@ -105,6 +107,65 @@ impl Lambda { } } + pub(crate) fn generate_network_enhanced_metrics( + network_offset: NetworkData, + network_data: NetworkData, + aggr: &mut std::sync::MutexGuard, + ) { + let rx_bytes = network_data.rx_bytes - network_offset.rx_bytes; + let tx_bytes = network_data.tx_bytes - network_offset.tx_bytes; + let total_network = rx_bytes + tx_bytes; + + let metric = Metric::new( + constants::RX_BYTES_METRIC.into(), + MetricValue::distribution(rx_bytes), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("failed to insert rx_bytes metric: {}", e); + } + + let metric = Metric::new( + constants::TX_BYTES_METRIC.into(), + MetricValue::distribution(tx_bytes), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("failed to insert tx_bytes metric: {}", e); + } + + let metric = Metric::new( + constants::TOTAL_NETWORK_METRIC.into(), + MetricValue::distribution(total_network), + None, + ); + if let Err(e) = aggr.insert(metric) { + error!("failed to insert total_network metric: {}", e); + } + } + + pub fn set_network_enhanced_metrics(&self, network_offset: Option) { + if !self.config.enhanced_metrics { + return; + } + + if let Some(offset) = network_offset { + let mut aggr: std::sync::MutexGuard = + self.aggregator.lock().expect("lock poisoned"); + + match proc::get_network_data() { + Ok(data) => { + Self::generate_network_enhanced_metrics(offset, data, &mut aggr); + } + Err(_e) => { + debug!("Could not emit network enhanced metrics"); + } + } + } else { + debug!("Could not emit network enhanced metrics"); + } + } + fn calculate_estimated_cost_usd(billed_duration_ms: u64, memory_size_mb: u64) -> f64 { let gb_seconds = (billed_duration_ms as f64 * constants::MS_TO_SEC) * (memory_size_mb as f64 / constants::MB_TO_GB); @@ -307,4 +368,29 @@ mod tests { assert_sketch(&metrics_aggr, constants::MAX_MEMORY_USED_METRIC, 128.0); assert_sketch(&metrics_aggr, constants::MEMORY_SIZE_METRIC, 256.0); } + + #[test] + fn test_set_network_enhanced_metrics() { + let (metrics_aggr, my_config) = setup(); + let lambda = Lambda::new(metrics_aggr.clone(), my_config); + + let network_offset = NetworkData { + rx_bytes: 180.0, + tx_bytes: 254.0, + }; + let network_data = NetworkData { + rx_bytes: 20180.0, + tx_bytes: 75000.0, + }; + + Lambda::generate_network_enhanced_metrics( + network_offset, + network_data, + &mut lambda.aggregator.lock().expect("lock poisoned"), + ); + + assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0); + assert_sketch(&metrics_aggr, constants::TX_BYTES_METRIC, 74746.0); + assert_sketch(&metrics_aggr, constants::TOTAL_NETWORK_METRIC, 94746.0); + } } diff --git a/bottlecap/src/proc/constants.rs b/bottlecap/src/proc/constants.rs new file mode 100644 index 000000000..be7986e53 --- /dev/null +++ b/bottlecap/src/proc/constants.rs @@ -0,0 +1,3 @@ +pub const PROC_NET_DEV_PATH: &str = "/proc/net/dev"; + +pub const LAMDBA_NETWORK_INTERFACE: &str = "vinternal_1"; diff --git a/bottlecap/src/proc/mod.rs b/bottlecap/src/proc/mod.rs new file mode 100644 index 000000000..16bfefb52 --- /dev/null +++ b/bottlecap/src/proc/mod.rs @@ -0,0 +1,82 @@ +pub mod constants; + +use std::{ + error::Error, + fs::File, + io::{self, BufRead}, +}; + +use constants::{LAMDBA_NETWORK_INTERFACE, PROC_NET_DEV_PATH}; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct NetworkData { + pub rx_bytes: f64, + pub tx_bytes: f64, +} + +pub fn get_network_data() -> Result> { + get_network_data_helper(PROC_NET_DEV_PATH) +} + +fn get_network_data_helper(path: &str) -> Result> { + let file = File::open(path).map_err(|e| Box::new(e) as Box)?; + let reader = io::BufReader::new(file); + + for line in reader.lines() { + let line = line.map_err(|e| Box::new(e) as Box)?; + let mut values = line.split_whitespace(); + + let Some(interface_name) = values.next() else { + continue; + }; + if !interface_name.starts_with(LAMDBA_NETWORK_INTERFACE) { + continue; + } + + let rx_bytes: f64 = match values.next().and_then(|s| s.parse().ok()) { + Some(value) => value, + None => continue, + }; + let tx_bytes: f64 = match values.nth(7).and_then(|s| s.parse().ok()) { + Some(value) => value, + None => continue, + }; + + return Ok(NetworkData { rx_bytes, tx_bytes }); + } + + Err("No matching network interface found".into()) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + #[allow(clippy::float_cmp)] + fn test_get_network_data() { + let path = "./tests/proc/net/valid_dev"; + let network_data_result = get_network_data_helper(&path); + assert!(!network_data_result.is_err()); + let network_data_result = network_data_result.unwrap(); + assert_eq!(network_data_result.rx_bytes, 180.0); + assert_eq!(network_data_result.tx_bytes, 254.0); + + let path = "./tests/proc/net/invalid_dev_malformed"; + let network_data_result = get_network_data_helper(&path); + assert!(network_data_result.is_err()); + + let path = "./tests/proc/net/invalid_dev_non_numerical_value"; + let network_data_result = get_network_data_helper(&path); + assert!(network_data_result.is_err()); + + let path = "./tests/proc/net/missing_interface_dev"; + let network_data_result = get_network_data_helper(&path); + assert!(network_data_result.is_err()); + + let path = "./tests/proc/net/nonexistent_dev"; + let network_data_result = get_network_data_helper(&path); + assert!(network_data_result.is_err()); + } +} diff --git a/bottlecap/tests/proc/net/invalid_dev_malformed b/bottlecap/tests/proc/net/invalid_dev_malformed new file mode 100644 index 000000000..5cd9f0ec9 --- /dev/null +++ b/bottlecap/tests/proc/net/invalid_dev_malformed @@ -0,0 +1,5 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 7490 63 0 0 0 0 0 0 7490 63 0 0 0 0 0 0 +vinternal_1: 180 3 0 0 0 ... +telemetry1_sb: 17284 50 0 0 0 0 0 0 15279 78 0 0 0 0 0 0 \ No newline at end of file diff --git a/bottlecap/tests/proc/net/invalid_dev_non_numerical_value b/bottlecap/tests/proc/net/invalid_dev_non_numerical_value new file mode 100644 index 000000000..9aae3404e --- /dev/null +++ b/bottlecap/tests/proc/net/invalid_dev_non_numerical_value @@ -0,0 +1,5 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 7490 63 0 0 0 0 0 0 7490 63 0 0 0 0 0 0 +vinternal_1: INVALID 3 0 0 0 0 0 0 254 4 0 0 0 0 0 0 +telemetry1_sb: 17284 50 0 0 0 0 0 0 15279 78 0 0 0 0 0 0 \ No newline at end of file diff --git a/bottlecap/tests/proc/net/missing_interface_dev b/bottlecap/tests/proc/net/missing_interface_dev new file mode 100644 index 000000000..fb4a0224d --- /dev/null +++ b/bottlecap/tests/proc/net/missing_interface_dev @@ -0,0 +1,4 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 7490 63 0 0 0 0 0 0 7490 63 0 0 0 0 0 0 +telemetry1_sb: 17284 50 0 0 0 0 0 0 15279 78 0 0 0 0 0 0 \ No newline at end of file diff --git a/bottlecap/tests/proc/net/valid_dev b/bottlecap/tests/proc/net/valid_dev new file mode 100644 index 000000000..a20f0cc97 --- /dev/null +++ b/bottlecap/tests/proc/net/valid_dev @@ -0,0 +1,5 @@ +Inter-| Receive | Transmit +face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed +lo: 7490 63 0 0 0 0 0 0 7490 63 0 0 0 0 0 0 +vinternal_1: 180 3 0 0 0 0 0 0 254 4 0 0 0 0 0 0 +telemetry1_sb: 17284 50 0 0 0 0 0 0 15279 78 0 0 0 0 0 0 From 0e5796457626e468b9548b9e3fe9abfa4f8f253f Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 23 Oct 2024 14:23:24 -0400 Subject: [PATCH 2/3] naming fixes --- bottlecap/src/metrics/enhanced/lambda.rs | 18 ++++---- bottlecap/src/proc/mod.rs | 58 ++++++++++++------------ 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 144780102..657e57442 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -108,12 +108,12 @@ impl Lambda { } pub(crate) fn generate_network_enhanced_metrics( - network_offset: NetworkData, - network_data: NetworkData, + network_data_offset: NetworkData, + network_data_end: NetworkData, aggr: &mut std::sync::MutexGuard, ) { - let rx_bytes = network_data.rx_bytes - network_offset.rx_bytes; - let tx_bytes = network_data.tx_bytes - network_offset.tx_bytes; + let rx_bytes = network_data_end.rx_bytes - network_data_offset.rx_bytes; + let tx_bytes = network_data_end.tx_bytes - network_data_offset.tx_bytes; let total_network = rx_bytes + tx_bytes; let metric = Metric::new( @@ -122,7 +122,7 @@ impl Lambda { None, ); if let Err(e) = aggr.insert(metric) { - error!("failed to insert rx_bytes metric: {}", e); + error!("Failed to insert rx_bytes metric: {}", e); } let metric = Metric::new( @@ -131,7 +131,7 @@ impl Lambda { None, ); if let Err(e) = aggr.insert(metric) { - error!("failed to insert tx_bytes metric: {}", e); + error!("Failed to insert tx_bytes metric: {}", e); } let metric = Metric::new( @@ -140,7 +140,7 @@ impl Lambda { None, ); if let Err(e) = aggr.insert(metric) { - error!("failed to insert total_network metric: {}", e); + error!("Failed to insert total_network metric: {}", e); } } @@ -158,11 +158,11 @@ impl Lambda { Self::generate_network_enhanced_metrics(offset, data, &mut aggr); } Err(_e) => { - debug!("Could not emit network enhanced metrics"); + debug!("Could not find data to generate network enhanced metrics"); } } } else { - debug!("Could not emit network enhanced metrics"); + debug!("Could not find data to generate network enhanced metrics"); } } diff --git a/bottlecap/src/proc/mod.rs b/bottlecap/src/proc/mod.rs index 16bfefb52..258bc6edf 100644 --- a/bottlecap/src/proc/mod.rs +++ b/bottlecap/src/proc/mod.rs @@ -1,7 +1,6 @@ pub mod constants; use std::{ - error::Error, fs::File, io::{self, BufRead}, }; @@ -14,38 +13,41 @@ pub struct NetworkData { pub tx_bytes: f64, } -pub fn get_network_data() -> Result> { - get_network_data_helper(PROC_NET_DEV_PATH) +pub fn get_network_data() -> Result { + get_network_data_from_path(PROC_NET_DEV_PATH) } -fn get_network_data_helper(path: &str) -> Result> { - let file = File::open(path).map_err(|e| Box::new(e) as Box)?; +fn get_network_data_from_path(path: &str) -> Result { + let file = File::open(path)?; let reader = io::BufReader::new(file); for line in reader.lines() { - let line = line.map_err(|e| Box::new(e) as Box)?; + let line = line?; let mut values = line.split_whitespace(); - let Some(interface_name) = values.next() else { - continue; - }; - if !interface_name.starts_with(LAMDBA_NETWORK_INTERFACE) { - continue; + // Check for the line containing lambda network data by interface name + if let Some(interface_name) = values.next() { + if interface_name.starts_with(LAMDBA_NETWORK_INTERFACE) { + // Read the value for bytes received if present, otherwise break and return error + let rx_bytes: f64 = match values.next().and_then(|s| s.parse().ok()) { + Some(value) => value, + None => break, + }; + // Read the value for bytes transmitted if present, otherwise break and return error + let tx_bytes: f64 = match values.nth(7).and_then(|s| s.parse().ok()) { + Some(value) => value, + None => break, + }; + + return Ok(NetworkData { rx_bytes, tx_bytes }); + } } - - let rx_bytes: f64 = match values.next().and_then(|s| s.parse().ok()) { - Some(value) => value, - None => continue, - }; - let tx_bytes: f64 = match values.nth(7).and_then(|s| s.parse().ok()) { - Some(value) => value, - None => continue, - }; - - return Ok(NetworkData { rx_bytes, tx_bytes }); } - Err("No matching network interface found".into()) + Err(io::Error::new( + io::ErrorKind::NotFound, + "Network data not found", + )) } #[cfg(test)] @@ -57,26 +59,26 @@ mod tests { #[allow(clippy::float_cmp)] fn test_get_network_data() { let path = "./tests/proc/net/valid_dev"; - let network_data_result = get_network_data_helper(&path); + let network_data_result = get_network_data_from_path(&path); assert!(!network_data_result.is_err()); let network_data_result = network_data_result.unwrap(); assert_eq!(network_data_result.rx_bytes, 180.0); assert_eq!(network_data_result.tx_bytes, 254.0); let path = "./tests/proc/net/invalid_dev_malformed"; - let network_data_result = get_network_data_helper(&path); + let network_data_result = get_network_data_from_path(&path); assert!(network_data_result.is_err()); let path = "./tests/proc/net/invalid_dev_non_numerical_value"; - let network_data_result = get_network_data_helper(&path); + let network_data_result = get_network_data_from_path(&path); assert!(network_data_result.is_err()); let path = "./tests/proc/net/missing_interface_dev"; - let network_data_result = get_network_data_helper(&path); + let network_data_result = get_network_data_from_path(&path); assert!(network_data_result.is_err()); let path = "./tests/proc/net/nonexistent_dev"; - let network_data_result = get_network_data_helper(&path); + let network_data_result = get_network_data_from_path(&path); assert!(network_data_result.is_err()); } } From 5a9acb1fd8654125aa1528ddb6ca03f640228d42 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Wed, 23 Oct 2024 16:20:18 -0400 Subject: [PATCH 3/3] reformatting reading data from proc --- bottlecap/src/proc/mod.rs | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/bottlecap/src/proc/mod.rs b/bottlecap/src/proc/mod.rs index 258bc6edf..c33a2984e 100644 --- a/bottlecap/src/proc/mod.rs +++ b/bottlecap/src/proc/mod.rs @@ -25,21 +25,29 @@ fn get_network_data_from_path(path: &str) -> Result { let line = line?; let mut values = line.split_whitespace(); - // Check for the line containing lambda network data by interface name - if let Some(interface_name) = values.next() { - if interface_name.starts_with(LAMDBA_NETWORK_INTERFACE) { - // Read the value for bytes received if present, otherwise break and return error - let rx_bytes: f64 = match values.next().and_then(|s| s.parse().ok()) { - Some(value) => value, - None => break, - }; - // Read the value for bytes transmitted if present, otherwise break and return error - let tx_bytes: f64 = match values.nth(7).and_then(|s| s.parse().ok()) { - Some(value) => value, - None => break, - }; - - return Ok(NetworkData { rx_bytes, tx_bytes }); + if values.next().map_or(false, |interface_name| { + interface_name.starts_with(LAMDBA_NETWORK_INTERFACE) + }) { + // Read the value for received bytes if present + let rx_bytes: Option = values.next().and_then(|s| s.parse().ok()); + + // Skip over the next 7 values representing metrics for received data and + // read the value for bytes transmitted if present + let tx_bytes: Option = values.nth(7).and_then(|s| s.parse().ok()); + + match (rx_bytes, tx_bytes) { + (Some(rx_val), Some(tx_val)) => { + return Ok(NetworkData { + rx_bytes: rx_val, + tx_bytes: tx_val, + }) + } + (_, _) => { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Network data not found", + )) + } } } }