From b2ba2d060c3ea7d594784aeaf6dae3d42a20c535 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:39:17 -0500 Subject: [PATCH 01/16] add some helper functions to `invocation::lifecycle` mod --- bottlecap/src/lifecycle/invocation/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 454cfa3bc..ed07072f1 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,4 +1,8 @@ use base64::{engine::general_purpose, DecodeError, Engine}; +use datadog_trace_protobuf::pb::Span; +use rand::{rngs::OsRng, Rng, RngCore}; + +use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; pub mod context; pub mod processor; @@ -11,3 +15,22 @@ pub fn base64_to_string(base64_string: &str) -> Result { Err(e) => Err(e), } } + +fn create_empty_span(name: String, resource: String, service: String) -> Span { + Span { + name, + resource, + service, + r#type: String::from("serverless"), + ..Default::default() + } +} + +fn generate_span_id() -> u64 { + if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { + return OsRng.next_u64(); + } + + let mut rng = rand::thread_rng(); + rng.gen() +} From 5a11b098f5dc7ab44151aeb5ac788aeeff129687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:39:38 -0500 Subject: [PATCH 02/16] create cold start span on processor --- .../src/lifecycle/invocation/processor.rs | 80 ++++++++++++++----- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index d6e44a98e..ae4285118 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -15,7 +15,8 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{ - base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + base64_to_string, context::ContextBuffer, create_empty_span, generate_span_id, + span_inferrer::SpanInferrer, }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, @@ -37,15 +38,25 @@ pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error"; pub struct Processor { + // Buffer containing context of the previous 5 invocations pub context_buffer: ContextBuffer, + // Helper to infer span information inferrer: SpanInferrer, + // Current invocation span pub span: Span, + // Cold start span + cold_start_span: Option, + // Extracted span context from inferred span, headers, or payload pub extracted_span_context: Option, // Used to extract the trace context from inferred span, headers, or payload propagator: DatadogCompositePropagator, + // Helper to send enhanced metrics enhanced_metrics: EnhancedMetrics, + // AWS configuration from the Lambda environment aws_config: AwsConfig, + // Flag to determine if a tracer was detected tracer_detected: bool, + // Used to determine if we should calculate heavy enhanced metrics enhanced_metrics_enabled: bool, } @@ -57,32 +68,18 @@ impl Processor { aws_config: &AwsConfig, metrics_aggregator: Arc>, ) -> Self { - let service = config.service.clone().unwrap_or("aws.lambda".to_string()); + let service = config.service.clone().unwrap_or(String::from("aws.lambda")); let resource = tags_provider .get_canonical_resource_name() - .unwrap_or("aws_lambda".to_string()); + .unwrap_or(String::from("aws.lambda")); let propagator = DatadogCompositePropagator::new(Arc::clone(&config)); Processor { context_buffer: ContextBuffer::default(), inferrer: SpanInferrer::default(), - span: Span { - service, - name: "aws.lambda".to_string(), - resource, - trace_id: 0, - span_id: 0, - parent_id: 0, - start: 0, - duration: 0, - error: 0, - meta: HashMap::new(), - metrics: HashMap::new(), - r#type: "serverless".to_string(), - meta_struct: HashMap::new(), - span_links: Vec::new(), - }, + span: create_empty_span(String::from("aws.lambda"), resource, service), + cold_start_span: None, extracted_span_context: None, propagator, enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), @@ -120,10 +117,36 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(); } + pub fn on_platform_init_start(&mut self, time: DateTime) { + // Create a cold start span + let mut cold_star_span = create_empty_span( + String::from("aws.lambda.cold_start"), + self.span.resource.clone(), + self.span.service.clone(), + ); + cold_star_span.span_id = generate_span_id(); + self.cold_start_span = Some(cold_star_span); + + let start_time: i64 = SystemTime::from(time) + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() + .try_into() + .unwrap_or_default(); + + self.span.start = start_time; + } + /// Given the duration of the platform init report, set the init duration metric. /// + #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { self.enhanced_metrics.set_init_duration_metric(duration_ms); + + if let Some(cold_start_span) = &mut self.cold_start_span { + // `round` is intentionally meant to be a whole integer + cold_start_span.duration = (duration_ms * MS_TO_NS).round() as i64; + } } /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. @@ -171,10 +194,10 @@ impl Processor { } if let Some(context) = self.context_buffer.get(request_id) { - let span = &mut self.span; // `round` is intentionally meant to be a whole integer - span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; - span.meta + self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; + self.span + .meta .insert("request_id".to_string(), request_id.clone()); // todo(duncanista): add missing tags // - cold start, proactive init @@ -199,6 +222,14 @@ impl Processor { self.inferrer.complete_inferred_spans(&self.span); + if let Some(cold_start_span) = &mut self.cold_start_span { + cold_start_span.trace_id = self.span.trace_id; + cold_start_span.parent_id = self.span.span_id; + self.span + .meta + .insert(String::from("cold_start"), String::from("true")); + } + if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); let mut traces = vec![self.span.clone()]; @@ -213,6 +244,11 @@ impl Processor { traces.push(ws.clone()); } + if let Some(cold_start_span) = &self.cold_start_span { + body_size += std::mem::size_of_val(cold_start_span); + traces.push(cold_start_span.clone()); + } + // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", From 15237bc8eacf96d0f699017be84047562c05c6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:40:07 -0500 Subject: [PATCH 03/16] move `generate_span_id` to father module --- .../src/lifecycle/invocation/span_inferrer.rs | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 39314dd83..a1744683d 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -1,23 +1,24 @@ use std::collections::HashMap; use datadog_trace_protobuf::pb::Span; -use rand::{rngs::OsRng, Rng, RngCore}; use serde_json::Value; use tracing::debug; use crate::config::AwsConfig; -use crate::lifecycle::invocation::triggers::{ - api_gateway_http_event::APIGatewayHttpEvent, - api_gateway_rest_event::APIGatewayRestEvent, - dynamodb_event::DynamoDbRecord, - event_bridge_event::EventBridgeEvent, - kinesis_event::KinesisRecord, - sns_event::{SnsEntity, SnsRecord}, - sqs_event::SqsRecord, - Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, +use crate::lifecycle::invocation::{ + generate_span_id, + triggers::{ + api_gateway_http_event::APIGatewayHttpEvent, + api_gateway_rest_event::APIGatewayRestEvent, + dynamodb_event::DynamoDbRecord, + event_bridge_event::EventBridgeEvent, + kinesis_event::KinesisRecord, + sns_event::{SnsEntity, SnsRecord}, + sqs_event::SqsRecord, + Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, + }, }; -use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; use super::triggers::s3_event::S3Record; @@ -60,7 +61,7 @@ impl SpanInferrer { let mut trigger: Option> = None; let mut inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -84,7 +85,7 @@ impl SpanInferrer { if let Ok(sns_entity) = serde_json::from_str::(&t.body) { debug!("Found an SNS event wrapped in the SQS body"); let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -103,7 +104,7 @@ impl SpanInferrer { serde_json::from_str::(&t.body) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -127,7 +128,7 @@ impl SpanInferrer { serde_json::from_str::(message) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -254,15 +255,6 @@ impl SpanInferrer { } } - fn generate_span_id() -> u64 { - if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { - return OsRng.next_u64(); - } - - let mut rng = rand::thread_rng(); - rng.gen() - } - /// Returns a clone of the carrier associated with the inferred span /// #[must_use] From c071d4503376897c7eaada11105efe35460a315d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:40:21 -0500 Subject: [PATCH 04/16] send `platform_init_start` data to processor --- bottlecap/src/bin/bottlecap/main.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index c72566ef3..301322829 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -400,9 +400,9 @@ async fn extension_loop_active( } Event::Telemetry(event) => match event.record { - TelemetryRecord::PlatformStart { request_id, .. } => { + TelemetryRecord::PlatformInitStart { .. } => { let mut p = invocation_processor.lock().await; - p.on_platform_start(request_id, event.time); + p.on_platform_init_start(event.time); drop(p); } TelemetryRecord::PlatformInitReport { @@ -415,6 +415,11 @@ async fn extension_loop_active( p.on_platform_init_report(metrics.duration_ms); drop(p); } + TelemetryRecord::PlatformStart { request_id, .. } => { + let mut p = invocation_processor.lock().await; + p.on_platform_start(request_id, event.time); + drop(p); + } TelemetryRecord::PlatformRuntimeDone { request_id, status, From a3512b05df3d7370899f2b9e4ada4dbbee58b34a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:04:05 -0500 Subject: [PATCH 05/16] send `PlatformInitStart` to main bus --- bottlecap/src/logs/lambda/processor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 6de124be6..d80db8912 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -87,8 +87,13 @@ impl LambdaProcessor { runtime_version_arn, .. // TODO: check if we could do something with this metrics: `initialization_type` and `phase` } => { + if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { + error!("Failed to send PlatformInitStart to the main event bus: {}", e); + } + let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display + Ok(Message::new( format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"), None, @@ -178,7 +183,6 @@ impl LambdaProcessor { )) }, // TODO: PlatformInitRuntimeDone - // TODO: PlatformInitReport // TODO: PlatformExtension // TODO: PlatformTelemetrySubscription // TODO: PlatformLogsDropped From 1aca5532e5d26ea1156b12389d01c41a2c7a9fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:04:19 -0500 Subject: [PATCH 06/16] update cold start `parent_id` --- bottlecap/src/lifecycle/invocation/processor.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 65d03e969..8ceb6f6d6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -125,13 +125,13 @@ impl Processor { pub fn on_platform_init_start(&mut self, time: DateTime) { // Create a cold start span - let mut cold_star_span = create_empty_span( + let mut cold_start_span = create_empty_span( String::from("aws.lambda.cold_start"), self.span.resource.clone(), self.span.service.clone(), ); - cold_star_span.span_id = generate_span_id(); - self.cold_start_span = Some(cold_star_span); + cold_start_span.span_id = generate_span_id(); + self.cold_start_span = Some(cold_start_span); let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) @@ -147,11 +147,12 @@ impl Processor { /// #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { + debug!("Setting cold start span duration: {duration_ms}"); self.enhanced_metrics.set_init_duration_metric(duration_ms); if let Some(cold_start_span) = &mut self.cold_start_span { // `round` is intentionally meant to be a whole integer - cold_start_span.duration = (duration_ms * MS_TO_NS).round() as i64; + cold_start_span.duration = (duration_ms * MS_TO_NS) as i64; } } @@ -230,7 +231,7 @@ impl Processor { if let Some(cold_start_span) = &mut self.cold_start_span { cold_start_span.trace_id = self.span.trace_id; - cold_start_span.parent_id = self.span.span_id; + cold_start_span.parent_id = self.span.parent_id; self.span .meta .insert(String::from("cold_start"), String::from("true")); @@ -253,6 +254,8 @@ impl Processor { if let Some(cold_start_span) = &self.cold_start_span { body_size += std::mem::size_of_val(cold_start_span); traces.push(cold_start_span.clone()); + // Reset the cold start span + self.cold_start_span = None; } // todo: figure out what to do here From 3eda969e5017aec57398cf0f28015752af5b83b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:16:52 -0500 Subject: [PATCH 07/16] fix start time of cold start span --- bottlecap/src/lifecycle/invocation/processor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 8ceb6f6d6..40cf180fe 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -130,8 +130,6 @@ impl Processor { self.span.resource.clone(), self.span.service.clone(), ); - cold_start_span.span_id = generate_span_id(); - self.cold_start_span = Some(cold_start_span); let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) @@ -140,7 +138,10 @@ impl Processor { .try_into() .unwrap_or_default(); - self.span.start = start_time; + cold_start_span.span_id = generate_span_id(); + cold_start_span.start = start_time; + + self.cold_start_span = Some(cold_start_span); } /// Given the duration of the platform init report, set the init duration metric. From 6bf8c86582786eb23e1d0e5bd33e111a6a862261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:11:45 -0500 Subject: [PATCH 08/16] enhanced metrics now have a `dynamic_value_tags` for tags which we have to calculate at points in time --- bottlecap/src/metrics/enhanced/lambda.rs | 120 +++++++++++++++++------ 1 file changed, 89 insertions(+), 31 deletions(-) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 6e4f53c72..15724aa63 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,10 +1,13 @@ -use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; -use super::statfs::statfs_info; +use crate::metrics::enhanced::{ + constants::{self, BASE_LAMBDA_INVOCATION_PRICE}, + statfs::statfs_info, +}; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::ReportMetrics; -use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; +use dogstatsd::{aggregator::Aggregator, metric::SortedTags}; +use std::collections::HashMap; use std::env::consts::ARCH; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -18,12 +21,43 @@ use tracing::error; pub struct Lambda { pub aggregator: Arc>, pub config: Arc, + // Dynamic value tags are the ones we cannot obtain statically from the sandbox + dynamic_value_tags: HashMap, } impl Lambda { #[must_use] pub fn new(aggregator: Arc>, config: Arc) -> Lambda { - Lambda { aggregator, config } + Lambda { + aggregator, + config, + dynamic_value_tags: HashMap::new(), + } + } + + pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { + self.dynamic_value_tags + .insert(String::from("cold_start"), cold_start.to_string()); + + // Only set `proactive_initialization` tag if it is true + if proactive_initialization { + self.dynamic_value_tags.insert( + String::from("proactive_initialization"), + String::from("true"), + ); + } + } + + fn get_dynamic_value_tags(&self) -> Option { + let vec_tags: Vec = self + .dynamic_value_tags + .iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(); + + let string_tags = vec_tags.join(","); + + SortedTags::parse(&string_tags).ok() } pub fn increment_invocation_metric(&self) { @@ -45,7 +79,7 @@ impl Lambda { let metric = Metric::new( constants::INIT_DURATION_METRIC.into(), MetricValue::distribution(init_duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self @@ -62,7 +96,11 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - let metric = Metric::new(metric_name.into(), MetricValue::distribution(1f64), None); + let metric = Metric::new( + metric_name.into(), + MetricValue::distribution(1f64), + self.get_dynamic_value_tags(), + ); if let Err(e) = self .aggregator .lock() @@ -81,7 +119,7 @@ impl Lambda { constants::RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -101,7 +139,7 @@ impl Lambda { constants::POST_RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -113,10 +151,11 @@ impl Lambda { } } - pub(crate) fn generate_network_enhanced_metrics( + pub fn generate_network_enhanced_metrics( network_data_offset: NetworkData, network_data_end: NetworkData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let rx_bytes = network_data_end.rx_bytes - network_data_offset.rx_bytes; let tx_bytes = network_data_end.tx_bytes - network_data_offset.tx_bytes; @@ -125,7 +164,7 @@ impl Lambda { let metric = Metric::new( constants::RX_BYTES_METRIC.into(), MetricValue::distribution(rx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert rx_bytes metric: {}", e); @@ -134,7 +173,7 @@ impl Lambda { let metric = Metric::new( constants::TX_BYTES_METRIC.into(), MetricValue::distribution(tx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tx_bytes metric: {}", e); @@ -143,7 +182,7 @@ impl Lambda { let metric = Metric::new( constants::TOTAL_NETWORK_METRIC.into(), MetricValue::distribution(total_network), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert total_network metric: {}", e); @@ -161,7 +200,12 @@ impl Lambda { match proc::get_network_data() { Ok(data) => { - Self::generate_network_enhanced_metrics(offset, data, &mut aggr); + Self::generate_network_enhanced_metrics( + offset, + data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } Err(_e) => { debug!("Could not find data to generate network enhanced metrics"); @@ -176,6 +220,7 @@ impl Lambda { cpu_data_offset: &CPUData, cpu_data_end: &CPUData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let cpu_user_time = cpu_data_end.total_user_time_ms - cpu_data_offset.total_user_time_ms; let cpu_system_time = @@ -185,7 +230,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_USER_TIME_METRIC.into(), MetricValue::distribution(cpu_user_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_user_time metric: {}", e); @@ -194,7 +239,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_SYSTEM_TIME_METRIC.into(), MetricValue::distribution(cpu_system_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_system_time metric: {}", e); @@ -203,7 +248,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_TIME_METRIC.into(), MetricValue::distribution(cpu_total_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_time metric: {}", e); @@ -221,7 +266,12 @@ impl Lambda { let cpu_data = proc::get_cpu_data(); match (cpu_offset, cpu_data) { (Some(cpu_offset), Ok(cpu_data)) => { - Self::generate_cpu_time_enhanced_metrics(&cpu_offset, &cpu_data, &mut aggr); + Self::generate_cpu_time_enhanced_metrics( + &cpu_offset, + &cpu_data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } (_, _) => { debug!("Could not find data to generate cpu time enhanced metrics"); @@ -235,6 +285,7 @@ impl Lambda { uptime_data_offset: f64, uptime_data_end: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let num_cores = cpu_data_end.individual_cpu_idle_times.len() as f64; let uptime = uptime_data_end - uptime_data_offset; @@ -276,7 +327,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), MetricValue::distribution(cpu_total_utilization_pct), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization_pct metric: {}", e); @@ -285,7 +336,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_total_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization metric: {}", e); @@ -294,7 +345,7 @@ impl Lambda { let metric = Metric::new( constants::NUM_CORES_METRIC.into(), MetricValue::distribution(num_cores), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert num_cores metric: {}", e); @@ -303,7 +354,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MAX_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_max_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_max_utilization metric: {}", e); @@ -312,7 +363,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MIN_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_min_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_min_utilization metric: {}", e); @@ -341,6 +392,7 @@ impl Lambda { uptime_offset, uptime_data, &mut aggr, + self.get_dynamic_value_tags(), ); } (_, _, _, _) => { @@ -353,11 +405,12 @@ impl Lambda { tmp_max: f64, tmp_used: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let metric = Metric::new( constants::TMP_MAX_METRIC.into(), MetricValue::distribution(tmp_max), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_max metric: {}", e); @@ -366,7 +419,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_USED_METRIC.into(), MetricValue::distribution(tmp_used), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_used metric: {}", e); @@ -376,7 +429,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_FREE_METRIC.into(), MetricValue::distribution(tmp_free), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_free metric: {}", e); @@ -389,6 +442,7 @@ impl Lambda { } let aggr = Arc::clone(&self.aggregator); + let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { // Set tmp_max and initial value for tmp_used @@ -410,7 +464,7 @@ impl Lambda { _ = send_metrics.changed() => { let mut aggr: std::sync::MutexGuard = aggr.lock().expect("lock poisoned"); - Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr, tags); return; } // Otherwise keep monitoring tmp usage periodically @@ -454,7 +508,7 @@ impl Lambda { let metric = metric::Metric::new( constants::DURATION_METRIC.into(), MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert duration metric: {}", e); @@ -462,7 +516,7 @@ impl Lambda { let metric = metric::Metric::new( constants::BILLED_DURATION_METRIC.into(), MetricValue::distribution(metrics.billed_duration_ms as f64 * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert billed duration metric: {}", e); @@ -470,7 +524,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MAX_MEMORY_USED_METRIC.into(), MetricValue::distribution(metrics.max_memory_used_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert max memory used metric: {}", e); @@ -478,7 +532,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MEMORY_SIZE_METRIC.into(), MetricValue::distribution(metrics.memory_size_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert memory size metric: {}", e); @@ -489,7 +543,7 @@ impl Lambda { let metric = metric::Metric::new( constants::ESTIMATED_COST_METRIC.into(), MetricValue::distribution(cost_usd), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert estimated cost metric: {}", e); @@ -710,6 +764,7 @@ mod tests { network_offset, network_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0); @@ -746,6 +801,7 @@ mod tests { &cpu_offset, &cpu_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0); @@ -786,6 +842,7 @@ mod tests { uptime_offset, uptime_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); // the differences above and metric values below are from an invocation using the go agent to verify the calculations @@ -812,6 +869,7 @@ mod tests { tmp_max, tmp_used, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); From d1dec535f89e0934389b7f9e874d482caf087db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:12:12 -0500 Subject: [PATCH 09/16] `AwsConfig` now has a `sandbox_init_time` value --- bottlecap/src/bin/bottlecap/main.rs | 2 ++ bottlecap/src/config/mod.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 301322829..5b691c2ee 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -60,6 +60,7 @@ use std::{ path::Path, process::Command, sync::{Arc, Mutex}, + time::Instant, }; use telemetry::listener::TelemetryListenerConfig; use tokio::sync::mpsc::Sender; @@ -201,6 +202,7 @@ fn load_configs() -> (AwsConfig, Arc) { aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(), aws_session_token: env::var("AWS_SESSION_TOKEN").unwrap_or_default(), function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").unwrap_or_default(), + sandbox_init_time: Instant::now(), }; let lambda_directory = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string()); let config = match config::get_config(Path::new(&lambda_directory)) { diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 870b42d21..aded42881 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -4,6 +4,7 @@ pub mod processing_rule; pub mod trace_propagation_style; use std::path::Path; +use std::time::Instant; use std::vec; use figment::providers::{Format, Yaml}; @@ -218,6 +219,7 @@ pub struct AwsConfig { pub aws_secret_access_key: String, pub aws_session_token: String, pub function_name: String, + pub sandbox_init_time: Instant, } #[cfg(test)] From 5d8a5617cf6e3ef397c3e42abd8d7396e3a363e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:12:31 -0500 Subject: [PATCH 10/16] add `is_empty` to `ContextBuffer` --- bottlecap/src/lifecycle/invocation/context.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 4cffb929c..d8ffaa565 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -165,6 +165,13 @@ impl ContextBuffer { pub fn size(&self) -> usize { self.buffer.len() } + + /// Returns if the buffer is empty. + /// + #[must_use] + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } } #[cfg(test)] From c45abdfffcfe90d4de9d4cca1a14e551c014e72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:18:07 -0500 Subject: [PATCH 11/16] calculate init tags on invoke also add a method to reset processor invocation state --- .../src/lifecycle/invocation/processor.rs | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 40cf180fe..96c4ec3fd 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, - time::{SystemTime, UNIX_EPOCH}, + time::{Instant, SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; @@ -37,6 +37,7 @@ use crate::{ pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type"; @@ -98,6 +99,9 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// pub fn on_invoke_event(&mut self, request_id: String) { + self.reset_state(); + self.set_init_tags(); + self.context_buffer.create_context(request_id.clone()); if self.enhanced_metrics_enabled { // Collect offsets for network and cpu metrics @@ -123,6 +127,54 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(); } + /// Resets the state of the processor to default values. + /// + fn reset_state(&mut self) { + // Reset Span Context on Span + self.span.trace_id = 0; + self.span.parent_id = 0; + self.span.span_id = 0; + // Error + self.span.error = 0; + // Meta tags + self.span.meta.clear(); + // Extracted Span Context + self.extracted_span_context = None; + // Cold Start Span + self.cold_start_span = None; + } + + /// On the first invocation, determine if it's a cold start or proactive init. + /// + /// For every other invocation, it will always be warm start. + /// + fn set_init_tags(&mut self) { + let mut proactive_initialization = false; + let mut cold_start = false; + + // If it's empty, then we are in a cold start + if self.context_buffer.is_empty() { + let now = Instant::now(); + let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time); + if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() { + proactive_initialization = true; + } else { + cold_start = true; + } + } + + self.span.meta.extend([ + (String::from("cold_start"), cold_start.to_string()), + ( + String::from("proactive_initialization"), + proactive_initialization.to_string(), + ), + ]); + + self.enhanced_metrics + .set_init_tags(proactive_initialization, cold_start); + } + pub fn on_platform_init_start(&mut self, time: DateTime) { // Create a cold start span let mut cold_start_span = create_empty_span( @@ -255,8 +307,6 @@ impl Processor { if let Some(cold_start_span) = &self.cold_start_span { body_size += std::mem::size_of_val(cold_start_span); traces.push(cold_start_span.clone()); - // Reset the cold start span - self.cold_start_span = None; } // todo: figure out what to do here @@ -295,7 +345,7 @@ impl Processor { // Set the report log metrics self.enhanced_metrics.set_report_log_metrics(&metrics); - if let Some(context) = self.context_buffer.remove(request_id) { + if let Some(context) = self.context_buffer.get(request_id) { if context.runtime_duration_ms != 0.0 { let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; @@ -305,7 +355,7 @@ impl Processor { } // Set Network and CPU time metrics - if let Some(offsets) = context.enhanced_metric_data { + if let Some(offsets) = context.enhanced_metric_data.clone() { self.enhanced_metrics .set_network_enhanced_metrics(offsets.network_offset); self.enhanced_metrics @@ -320,14 +370,6 @@ impl Processor { pub fn on_invocation_start(&mut self, headers: HashMap, payload: Vec) { self.tracer_detected = true; - // Reset trace context - self.span.trace_id = 0; - self.span.parent_id = 0; - self.span.span_id = 0; - self.span.error = 0; - self.span.meta.clear(); - self.extracted_span_context = None; - let payload_value = match serde_json::from_slice::(&payload) { Ok(value) => value, Err(_) => json!({}), From 2277f8afd16eb43e26b1ce73aa4e437e2281e449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:26:34 -0500 Subject: [PATCH 12/16] restart init tags on set --- bottlecap/src/metrics/enhanced/lambda.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 15724aa63..b3786ec01 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -35,7 +35,11 @@ impl Lambda { } } + /// Set the dynamic value tags that are not available at compile time pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { + self.dynamic_value_tags.remove("cold_start"); + self.dynamic_value_tags.remove("proactive_initialization"); + self.dynamic_value_tags .insert(String::from("cold_start"), cold_start.to_string()); From a74fb309b1fce2a9f46a3678e9f1fe78d765a94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:34:50 -0500 Subject: [PATCH 13/16] set tags properly for proactive init --- bottlecap/src/lifecycle/invocation/processor.rs | 12 +++++++----- bottlecap/src/metrics/enhanced/lambda.rs | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 96c4ec3fd..bee42fc3a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -163,13 +163,15 @@ impl Processor { } } - self.span.meta.extend([ - (String::from("cold_start"), cold_start.to_string()), - ( + if proactive_initialization { + self.span.meta.insert( String::from("proactive_initialization"), proactive_initialization.to_string(), - ), - ]); + ); + } + self.span + .meta + .insert(String::from("cold_start"), cold_start.to_string()); self.enhanced_metrics .set_init_tags(proactive_initialization, cold_start); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index b3786ec01..cb11a9811 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -39,7 +39,7 @@ impl Lambda { pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { self.dynamic_value_tags.remove("cold_start"); self.dynamic_value_tags.remove("proactive_initialization"); - + self.dynamic_value_tags .insert(String::from("cold_start"), cold_start.to_string()); From 583a79c89735b9efe2d824a853473829f68229c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:40:46 -0500 Subject: [PATCH 14/16] fix unit test --- bottlecap/src/secrets/decrypt.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bottlecap/src/secrets/decrypt.rs b/bottlecap/src/secrets/decrypt.rs index a29e6a39d..615e35284 100644 --- a/bottlecap/src/secrets/decrypt.rs +++ b/bottlecap/src/secrets/decrypt.rs @@ -241,6 +241,7 @@ mod tests { aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY".to_string(), aws_session_token: "AQoDYXdzEJr...".to_string(), function_name: "arn:some-function".to_string(), + sandbox_init_time: Instant::now(), }, RequestArgs { service: "secretsmanager".to_string(), From 816e96644591522ed0914b10e8d47e86d5328ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:46:38 -0500 Subject: [PATCH 15/16] remove debug line --- bottlecap/src/lifecycle/invocation/processor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index bee42fc3a..499008ac9 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -202,7 +202,6 @@ impl Processor { /// #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { - debug!("Setting cold start span duration: {duration_ms}"); self.enhanced_metrics.set_init_duration_metric(duration_ms); if let Some(cold_start_span) = &mut self.cold_start_span { From 128802ef8658d6d20b98e10b427b546755adb329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:03:38 -0500 Subject: [PATCH 16/16] make sure `cold_start` tag is only set in one place --- bottlecap/src/lifecycle/invocation/processor.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 499008ac9..7ec4901bb 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -286,9 +286,6 @@ impl Processor { if let Some(cold_start_span) = &mut self.cold_start_span { cold_start_span.trace_id = self.span.trace_id; cold_start_span.parent_id = self.span.parent_id; - self.span - .meta - .insert(String::from("cold_start"), String::from("true")); } if self.tracer_detected {