diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index c72566ef3..5b691c2ee 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -60,6 +60,7 @@ use std::{ path::Path, process::Command, sync::{Arc, Mutex}, + time::Instant, }; use telemetry::listener::TelemetryListenerConfig; use tokio::sync::mpsc::Sender; @@ -201,6 +202,7 @@ fn load_configs() -> (AwsConfig, Arc) { aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(), aws_session_token: env::var("AWS_SESSION_TOKEN").unwrap_or_default(), function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").unwrap_or_default(), + sandbox_init_time: Instant::now(), }; let lambda_directory = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string()); let config = match config::get_config(Path::new(&lambda_directory)) { @@ -400,9 +402,9 @@ async fn extension_loop_active( } Event::Telemetry(event) => match event.record { - TelemetryRecord::PlatformStart { request_id, .. } => { + TelemetryRecord::PlatformInitStart { .. } => { let mut p = invocation_processor.lock().await; - p.on_platform_start(request_id, event.time); + p.on_platform_init_start(event.time); drop(p); } TelemetryRecord::PlatformInitReport { @@ -415,6 +417,11 @@ async fn extension_loop_active( p.on_platform_init_report(metrics.duration_ms); drop(p); } + TelemetryRecord::PlatformStart { request_id, .. } => { + let mut p = invocation_processor.lock().await; + p.on_platform_start(request_id, event.time); + drop(p); + } TelemetryRecord::PlatformRuntimeDone { request_id, status, diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 2c0fbab2f..8acc9feac 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -4,6 +4,7 @@ pub mod processing_rule; pub mod trace_propagation_style; use std::path::Path; +use std::time::Instant; use std::vec; use figment::providers::{Format, Yaml}; @@ -221,6 +222,7 @@ pub struct AwsConfig { pub aws_secret_access_key: String, pub aws_session_token: String, pub function_name: String, + pub sandbox_init_time: Instant, } #[cfg(test)] diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 7010e245a..680a967ba 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -165,6 +165,13 @@ impl ContextBuffer { pub fn size(&self) -> usize { self.buffer.len() } + + /// Returns if the buffer is empty. + /// + #[must_use] + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } } #[cfg(test)] diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index aca184c2b..b62a757d9 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,5 +1,8 @@ use base64::{engine::general_purpose, DecodeError, Engine}; use datadog_trace_protobuf::pb::Span; +use rand::{rngs::OsRng, Rng, RngCore}; + +use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; use serde_json::Value; use tracing::debug; @@ -27,6 +30,25 @@ pub fn base64_to_string(base64_string: &str) -> Result { } } +fn create_empty_span(name: String, resource: String, service: String) -> Span { + Span { + name, + resource, + service, + r#type: String::from("serverless"), + ..Default::default() + } +} + +fn generate_span_id() -> u64 { + if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { + return OsRng.next_u64(); + } + + let mut rng = rand::thread_rng(); + rng.gen() +} + pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) { // Null scenario if value.is_null() { diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index a94164a18..eb9e00b6c 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, - time::{SystemTime, UNIX_EPOCH}, + time::{Instant, SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; @@ -15,7 +15,8 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{ - base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value, + base64_to_string, context::ContextBuffer, create_empty_span, generate_span_id, + span_inferrer::SpanInferrer, tag_span_from_value, }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, @@ -36,6 +37,7 @@ use crate::{ pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type"; @@ -43,14 +45,23 @@ pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error"; pub struct Processor { + // Buffer containing context of the previous 5 invocations pub context_buffer: ContextBuffer, + // Helper to infer span information inferrer: SpanInferrer, + // Current invocation span pub span: Span, + // Cold start span + cold_start_span: Option, + // Extracted span context from inferred span, headers, or payload pub extracted_span_context: Option, // Used to extract the trace context from inferred span, headers, or payload propagator: DatadogCompositePropagator, + // Helper to send enhanced metrics enhanced_metrics: EnhancedMetrics, + // AWS configuration from the Lambda environment aws_config: AwsConfig, + // Flag to determine if a tracer was detected tracer_detected: bool, config: Arc, } @@ -63,32 +74,18 @@ impl Processor { aws_config: &AwsConfig, metrics_aggregator: Arc>, ) -> Self { - let service = config.service.clone().unwrap_or("aws.lambda".to_string()); + let service = config.service.clone().unwrap_or(String::from("aws.lambda")); let resource = tags_provider .get_canonical_resource_name() - .unwrap_or("aws_lambda".to_string()); + .unwrap_or(String::from("aws.lambda")); let propagator = DatadogCompositePropagator::new(Arc::clone(&config)); Processor { context_buffer: ContextBuffer::default(), inferrer: SpanInferrer::default(), - span: Span { - service, - name: "aws.lambda".to_string(), - resource, - trace_id: 0, - span_id: 0, - parent_id: 0, - start: 0, - duration: 0, - error: 0, - meta: HashMap::new(), - metrics: HashMap::new(), - r#type: "serverless".to_string(), - meta_struct: HashMap::new(), - span_links: Vec::new(), - }, + span: create_empty_span(String::from("aws.lambda"), resource, service), + cold_start_span: None, extracted_span_context: None, propagator, enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), @@ -101,6 +98,9 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// pub fn on_invoke_event(&mut self, request_id: String) { + self.reset_state(); + self.set_init_tags(); + self.context_buffer.create_context(request_id.clone()); if self.config.enhanced_metrics { // Collect offsets for network and cpu metrics @@ -132,10 +132,87 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(); } + /// Resets the state of the processor to default values. + /// + fn reset_state(&mut self) { + // Reset Span Context on Span + self.span.trace_id = 0; + self.span.parent_id = 0; + self.span.span_id = 0; + // Error + self.span.error = 0; + // Meta tags + self.span.meta.clear(); + // Extracted Span Context + self.extracted_span_context = None; + // Cold Start Span + self.cold_start_span = None; + } + + /// On the first invocation, determine if it's a cold start or proactive init. + /// + /// For every other invocation, it will always be warm start. + /// + fn set_init_tags(&mut self) { + let mut proactive_initialization = false; + let mut cold_start = false; + + // If it's empty, then we are in a cold start + if self.context_buffer.is_empty() { + let now = Instant::now(); + let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time); + if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() { + proactive_initialization = true; + } else { + cold_start = true; + } + } + + if proactive_initialization { + self.span.meta.insert( + String::from("proactive_initialization"), + proactive_initialization.to_string(), + ); + } + self.span + .meta + .insert(String::from("cold_start"), cold_start.to_string()); + + self.enhanced_metrics + .set_init_tags(proactive_initialization, cold_start); + } + + pub fn on_platform_init_start(&mut self, time: DateTime) { + // Create a cold start span + let mut cold_start_span = create_empty_span( + String::from("aws.lambda.cold_start"), + self.span.resource.clone(), + self.span.service.clone(), + ); + + let start_time: i64 = SystemTime::from(time) + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() + .try_into() + .unwrap_or_default(); + + cold_start_span.span_id = generate_span_id(); + cold_start_span.start = start_time; + + self.cold_start_span = Some(cold_start_span); + } + /// Given the duration of the platform init report, set the init duration metric. /// + #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { self.enhanced_metrics.set_init_duration_metric(duration_ms); + + if let Some(cold_start_span) = &mut self.cold_start_span { + // `round` is intentionally meant to be a whole integer + cold_start_span.duration = (duration_ms * MS_TO_NS) as i64; + } } /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. @@ -183,10 +260,10 @@ impl Processor { } if let Some(context) = self.context_buffer.get(request_id) { - let span = &mut self.span; // `round` is intentionally meant to be a whole integer - span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; - span.meta + self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; + self.span + .meta .insert("request_id".to_string(), request_id.clone()); // todo(duncanista): add missing tags // - cold start, proactive init @@ -213,6 +290,11 @@ impl Processor { self.inferrer.complete_inferred_spans(&self.span); + if let Some(cold_start_span) = &mut self.cold_start_span { + cold_start_span.trace_id = self.span.trace_id; + cold_start_span.parent_id = self.span.parent_id; + } + if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); let mut traces = vec![self.span.clone()]; @@ -227,6 +309,11 @@ impl Processor { traces.push(ws.clone()); } + if let Some(cold_start_span) = &self.cold_start_span { + body_size += std::mem::size_of_val(cold_start_span); + traces.push(cold_start_span.clone()); + } + // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", @@ -263,7 +350,7 @@ impl Processor { // Set the report log metrics self.enhanced_metrics.set_report_log_metrics(&metrics); - if let Some(context) = self.context_buffer.remove(request_id) { + if let Some(context) = self.context_buffer.get(request_id) { if context.runtime_duration_ms != 0.0 { let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; @@ -273,7 +360,7 @@ impl Processor { } // Set Network and CPU time metrics - if let Some(offsets) = context.enhanced_metric_data { + if let Some(offsets) = context.enhanced_metric_data.clone() { self.enhanced_metrics .set_network_enhanced_metrics(offsets.network_offset); self.enhanced_metrics @@ -288,14 +375,6 @@ impl Processor { pub fn on_invocation_start(&mut self, headers: HashMap, payload: Vec) { self.tracer_detected = true; - // Reset trace context - self.span.trace_id = 0; - self.span.parent_id = 0; - self.span.span_id = 0; - self.span.error = 0; - self.span.meta.clear(); - self.extracted_span_context = None; - let payload_value = match serde_json::from_slice::(&payload) { Ok(value) => value, Err(_) => json!({}), diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index edcdabe55..f68134f46 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -1,26 +1,27 @@ use std::collections::HashMap; use datadog_trace_protobuf::pb::Span; -use rand::{rngs::OsRng, Rng, RngCore}; use serde_json::Value; use tracing::debug; use crate::config::AwsConfig; -use crate::lifecycle::invocation::triggers::{ - api_gateway_http_event::APIGatewayHttpEvent, - api_gateway_rest_event::APIGatewayRestEvent, - dynamodb_event::DynamoDbRecord, - event_bridge_event::EventBridgeEvent, - kinesis_event::KinesisRecord, - lambda_function_url_event::LambdaFunctionUrlEvent, - s3_event::S3Record, - sns_event::{SnsEntity, SnsRecord}, - sqs_event::SqsRecord, - step_function_event::StepFunctionEvent, - Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, +use crate::lifecycle::invocation::{ + generate_span_id, + triggers::{ + api_gateway_http_event::APIGatewayHttpEvent, + api_gateway_rest_event::APIGatewayRestEvent, + dynamodb_event::DynamoDbRecord, + event_bridge_event::EventBridgeEvent, + kinesis_event::KinesisRecord, + lambda_function_url_event::LambdaFunctionUrlEvent, + s3_event::S3Record, + sns_event::{SnsEntity, SnsRecord}, + sqs_event::SqsRecord, + step_function_event::StepFunctionEvent, + Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, + }, }; -use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; use crate::traces::{context::SpanContext, propagation::Propagator}; pub struct SpanInferrer { @@ -72,7 +73,7 @@ impl SpanInferrer { let mut trigger: Option> = None; let mut inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -102,7 +103,7 @@ impl SpanInferrer { if let Ok(sns_entity) = serde_json::from_str::(&t.body) { debug!("Found an SNS event wrapped in the SQS body"); let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -121,7 +122,7 @@ impl SpanInferrer { serde_json::from_str::(&t.body) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -145,7 +146,7 @@ impl SpanInferrer { serde_json::from_str::(message) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -283,16 +284,7 @@ impl SpanInferrer { } } - fn generate_span_id() -> u64 { - if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { - return OsRng.next_u64(); - } - - let mut rng = rand::thread_rng(); - rng.gen() - } - - /// Returns the extracted span context + /// Returns a clone of the carrier associated with the inferred span /// /// If the carrier is set, it will try to extract the span context, /// otherwise it will diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 6de124be6..d80db8912 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -87,8 +87,13 @@ impl LambdaProcessor { runtime_version_arn, .. // TODO: check if we could do something with this metrics: `initialization_type` and `phase` } => { + if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { + error!("Failed to send PlatformInitStart to the main event bus: {}", e); + } + let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display + Ok(Message::new( format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"), None, @@ -178,7 +183,6 @@ impl LambdaProcessor { )) }, // TODO: PlatformInitRuntimeDone - // TODO: PlatformInitReport // TODO: PlatformExtension // TODO: PlatformTelemetrySubscription // TODO: PlatformLogsDropped diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 3d999062f..a0b02aef7 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,10 +1,13 @@ -use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; -use super::statfs::statfs_info; +use crate::metrics::enhanced::{ + constants::{self, BASE_LAMBDA_INVOCATION_PRICE}, + statfs::statfs_info, +}; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::ReportMetrics; -use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; +use dogstatsd::{aggregator::Aggregator, metric::SortedTags}; +use std::collections::HashMap; use std::env::consts::ARCH; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -18,12 +21,47 @@ use tracing::error; pub struct Lambda { pub aggregator: Arc>, pub config: Arc, + // Dynamic value tags are the ones we cannot obtain statically from the sandbox + dynamic_value_tags: HashMap, } impl Lambda { #[must_use] pub fn new(aggregator: Arc>, config: Arc) -> Lambda { - Lambda { aggregator, config } + Lambda { + aggregator, + config, + dynamic_value_tags: HashMap::new(), + } + } + + /// Set the dynamic value tags that are not available at compile time + pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { + self.dynamic_value_tags.remove("cold_start"); + self.dynamic_value_tags.remove("proactive_initialization"); + + self.dynamic_value_tags + .insert(String::from("cold_start"), cold_start.to_string()); + + // Only set `proactive_initialization` tag if it is true + if proactive_initialization { + self.dynamic_value_tags.insert( + String::from("proactive_initialization"), + String::from("true"), + ); + } + } + + fn get_dynamic_value_tags(&self) -> Option { + let vec_tags: Vec = self + .dynamic_value_tags + .iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(); + + let string_tags = vec_tags.join(","); + + SortedTags::parse(&string_tags).ok() } pub fn increment_invocation_metric(&self) { @@ -45,7 +83,7 @@ impl Lambda { let metric = Metric::new( constants::INIT_DURATION_METRIC.into(), MetricValue::distribution(init_duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self @@ -62,7 +100,11 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - let metric = Metric::new(metric_name.into(), MetricValue::distribution(1f64), None); + let metric = Metric::new( + metric_name.into(), + MetricValue::distribution(1f64), + self.get_dynamic_value_tags(), + ); if let Err(e) = self .aggregator .lock() @@ -81,7 +123,7 @@ impl Lambda { constants::RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -101,7 +143,7 @@ impl Lambda { constants::POST_RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -113,10 +155,11 @@ impl Lambda { } } - pub(crate) fn generate_network_enhanced_metrics( + pub fn generate_network_enhanced_metrics( network_data_offset: NetworkData, network_data_end: NetworkData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let rx_bytes = network_data_end.rx_bytes - network_data_offset.rx_bytes; let tx_bytes = network_data_end.tx_bytes - network_data_offset.tx_bytes; @@ -125,7 +168,7 @@ impl Lambda { let metric = Metric::new( constants::RX_BYTES_METRIC.into(), MetricValue::distribution(rx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert rx_bytes metric: {}", e); @@ -134,7 +177,7 @@ impl Lambda { let metric = Metric::new( constants::TX_BYTES_METRIC.into(), MetricValue::distribution(tx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tx_bytes metric: {}", e); @@ -143,7 +186,7 @@ impl Lambda { let metric = Metric::new( constants::TOTAL_NETWORK_METRIC.into(), MetricValue::distribution(total_network), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert total_network metric: {}", e); @@ -161,7 +204,12 @@ impl Lambda { match proc::get_network_data() { Ok(data) => { - Self::generate_network_enhanced_metrics(offset, data, &mut aggr); + Self::generate_network_enhanced_metrics( + offset, + data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } Err(_e) => { debug!("Could not find data to generate network enhanced metrics"); @@ -176,6 +224,7 @@ impl Lambda { cpu_data_offset: &CPUData, cpu_data_end: &CPUData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let cpu_user_time = cpu_data_end.total_user_time_ms - cpu_data_offset.total_user_time_ms; let cpu_system_time = @@ -185,7 +234,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_USER_TIME_METRIC.into(), MetricValue::distribution(cpu_user_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_user_time metric: {}", e); @@ -194,7 +243,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_SYSTEM_TIME_METRIC.into(), MetricValue::distribution(cpu_system_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_system_time metric: {}", e); @@ -203,7 +252,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_TIME_METRIC.into(), MetricValue::distribution(cpu_total_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_time metric: {}", e); @@ -221,7 +270,12 @@ impl Lambda { let cpu_data = proc::get_cpu_data(); match (cpu_offset, cpu_data) { (Some(cpu_offset), Ok(cpu_data)) => { - Self::generate_cpu_time_enhanced_metrics(&cpu_offset, &cpu_data, &mut aggr); + Self::generate_cpu_time_enhanced_metrics( + &cpu_offset, + &cpu_data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } (_, _) => { debug!("Could not find data to generate cpu time enhanced metrics"); @@ -235,6 +289,7 @@ impl Lambda { uptime_data_offset: f64, uptime_data_end: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let num_cores = cpu_data_end.individual_cpu_idle_times.len() as f64; let uptime = uptime_data_end - uptime_data_offset; @@ -276,7 +331,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), MetricValue::distribution(cpu_total_utilization_pct), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization_pct metric: {}", e); @@ -285,7 +340,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_total_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization metric: {}", e); @@ -294,7 +349,7 @@ impl Lambda { let metric = Metric::new( constants::NUM_CORES_METRIC.into(), MetricValue::distribution(num_cores), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert num_cores metric: {}", e); @@ -303,7 +358,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MAX_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_max_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_max_utilization metric: {}", e); @@ -312,7 +367,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MIN_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_min_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_min_utilization metric: {}", e); @@ -341,6 +396,7 @@ impl Lambda { uptime_offset, uptime_data, &mut aggr, + self.get_dynamic_value_tags(), ); } (_, _, _, _) => { @@ -353,11 +409,12 @@ impl Lambda { tmp_max: f64, tmp_used: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let metric = Metric::new( constants::TMP_MAX_METRIC.into(), MetricValue::distribution(tmp_max), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_max metric: {}", e); @@ -366,7 +423,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_USED_METRIC.into(), MetricValue::distribution(tmp_used), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_used metric: {}", e); @@ -376,7 +433,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_FREE_METRIC.into(), MetricValue::distribution(tmp_free), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_free metric: {}", e); @@ -389,6 +446,7 @@ impl Lambda { } let aggr = Arc::clone(&self.aggregator); + let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { // Set tmp_max and initial value for tmp_used @@ -410,7 +468,7 @@ impl Lambda { _ = send_metrics.changed() => { let mut aggr: std::sync::MutexGuard = aggr.lock().expect("lock poisoned"); - Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr, tags); return; } // Otherwise keep monitoring tmp usage periodically @@ -563,7 +621,7 @@ impl Lambda { let metric = metric::Metric::new( constants::DURATION_METRIC.into(), MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert duration metric: {}", e); @@ -571,7 +629,7 @@ impl Lambda { let metric = metric::Metric::new( constants::BILLED_DURATION_METRIC.into(), MetricValue::distribution(metrics.billed_duration_ms as f64 * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert billed duration metric: {}", e); @@ -579,7 +637,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MAX_MEMORY_USED_METRIC.into(), MetricValue::distribution(metrics.max_memory_used_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert max memory used metric: {}", e); @@ -587,7 +645,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MEMORY_SIZE_METRIC.into(), MetricValue::distribution(metrics.memory_size_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert memory size metric: {}", e); @@ -598,7 +656,7 @@ impl Lambda { let metric = metric::Metric::new( constants::ESTIMATED_COST_METRIC.into(), MetricValue::distribution(cost_usd), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert estimated cost metric: {}", e); @@ -832,6 +890,7 @@ mod tests { network_offset, network_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0); @@ -868,6 +927,7 @@ mod tests { &cpu_offset, &cpu_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0); @@ -908,6 +968,7 @@ mod tests { uptime_offset, uptime_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); // the differences above and metric values below are from an invocation using the go agent to verify the calculations @@ -934,6 +995,7 @@ mod tests { tmp_max, tmp_used, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); diff --git a/bottlecap/src/secrets/decrypt.rs b/bottlecap/src/secrets/decrypt.rs index a29e6a39d..615e35284 100644 --- a/bottlecap/src/secrets/decrypt.rs +++ b/bottlecap/src/secrets/decrypt.rs @@ -241,6 +241,7 @@ mod tests { aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY".to_string(), aws_session_token: "AQoDYXdzEJr...".to_string(), function_name: "arn:some-function".to_string(), + sandbox_init_time: Instant::now(), }, RequestArgs { service: "secretsmanager".to_string(),