From b2ba2d060c3ea7d594784aeaf6dae3d42a20c535 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:39:17 -0500 Subject: [PATCH 01/23] add some helper functions to `invocation::lifecycle` mod --- bottlecap/src/lifecycle/invocation/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 454cfa3bc..ed07072f1 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,4 +1,8 @@ use base64::{engine::general_purpose, DecodeError, Engine}; +use datadog_trace_protobuf::pb::Span; +use rand::{rngs::OsRng, Rng, RngCore}; + +use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; pub mod context; pub mod processor; @@ -11,3 +15,22 @@ pub fn base64_to_string(base64_string: &str) -> Result { Err(e) => Err(e), } } + +fn create_empty_span(name: String, resource: String, service: String) -> Span { + Span { + name, + resource, + service, + r#type: String::from("serverless"), + ..Default::default() + } +} + +fn generate_span_id() -> u64 { + if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { + return OsRng.next_u64(); + } + + let mut rng = rand::thread_rng(); + rng.gen() +} From 5a11b098f5dc7ab44151aeb5ac788aeeff129687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:39:38 -0500 Subject: [PATCH 02/23] create cold start span on processor --- .../src/lifecycle/invocation/processor.rs | 80 ++++++++++++++----- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index d6e44a98e..ae4285118 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -15,7 +15,8 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{ - base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + base64_to_string, context::ContextBuffer, create_empty_span, generate_span_id, + span_inferrer::SpanInferrer, }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, @@ -37,15 +38,25 @@ pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error"; pub struct Processor { + // Buffer containing context of the previous 5 invocations pub context_buffer: ContextBuffer, + // Helper to infer span information inferrer: SpanInferrer, + // Current invocation span pub span: Span, + // Cold start span + cold_start_span: Option, + // Extracted span context from inferred span, headers, or payload pub extracted_span_context: Option, // Used to extract the trace context from inferred span, headers, or payload propagator: DatadogCompositePropagator, + // Helper to send enhanced metrics enhanced_metrics: EnhancedMetrics, + // AWS configuration from the Lambda environment aws_config: AwsConfig, + // Flag to determine if a tracer was detected tracer_detected: bool, + // Used to determine if we should calculate heavy enhanced metrics enhanced_metrics_enabled: bool, } @@ -57,32 +68,18 @@ impl Processor { aws_config: &AwsConfig, metrics_aggregator: Arc>, ) -> Self { - let service = config.service.clone().unwrap_or("aws.lambda".to_string()); + let service = config.service.clone().unwrap_or(String::from("aws.lambda")); let resource = tags_provider .get_canonical_resource_name() - .unwrap_or("aws_lambda".to_string()); + .unwrap_or(String::from("aws.lambda")); let propagator = DatadogCompositePropagator::new(Arc::clone(&config)); Processor { context_buffer: ContextBuffer::default(), inferrer: SpanInferrer::default(), - span: Span { - service, - name: "aws.lambda".to_string(), - resource, - trace_id: 0, - span_id: 0, - parent_id: 0, - start: 0, - duration: 0, - error: 0, - meta: HashMap::new(), - metrics: HashMap::new(), - r#type: "serverless".to_string(), - meta_struct: HashMap::new(), - span_links: Vec::new(), - }, + span: create_empty_span(String::from("aws.lambda"), resource, service), + cold_start_span: None, extracted_span_context: None, propagator, enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), @@ -120,10 +117,36 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(); } + pub fn on_platform_init_start(&mut self, time: DateTime) { + // Create a cold start span + let mut cold_star_span = create_empty_span( + String::from("aws.lambda.cold_start"), + self.span.resource.clone(), + self.span.service.clone(), + ); + cold_star_span.span_id = generate_span_id(); + self.cold_start_span = Some(cold_star_span); + + let start_time: i64 = SystemTime::from(time) + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() + .try_into() + .unwrap_or_default(); + + self.span.start = start_time; + } + /// Given the duration of the platform init report, set the init duration metric. /// + #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { self.enhanced_metrics.set_init_duration_metric(duration_ms); + + if let Some(cold_start_span) = &mut self.cold_start_span { + // `round` is intentionally meant to be a whole integer + cold_start_span.duration = (duration_ms * MS_TO_NS).round() as i64; + } } /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. @@ -171,10 +194,10 @@ impl Processor { } if let Some(context) = self.context_buffer.get(request_id) { - let span = &mut self.span; // `round` is intentionally meant to be a whole integer - span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; - span.meta + self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; + self.span + .meta .insert("request_id".to_string(), request_id.clone()); // todo(duncanista): add missing tags // - cold start, proactive init @@ -199,6 +222,14 @@ impl Processor { self.inferrer.complete_inferred_spans(&self.span); + if let Some(cold_start_span) = &mut self.cold_start_span { + cold_start_span.trace_id = self.span.trace_id; + cold_start_span.parent_id = self.span.span_id; + self.span + .meta + .insert(String::from("cold_start"), String::from("true")); + } + if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); let mut traces = vec![self.span.clone()]; @@ -213,6 +244,11 @@ impl Processor { traces.push(ws.clone()); } + if let Some(cold_start_span) = &self.cold_start_span { + body_size += std::mem::size_of_val(cold_start_span); + traces.push(cold_start_span.clone()); + } + // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", From 15237bc8eacf96d0f699017be84047562c05c6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:40:07 -0500 Subject: [PATCH 03/23] move `generate_span_id` to father module --- .../src/lifecycle/invocation/span_inferrer.rs | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 39314dd83..a1744683d 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -1,23 +1,24 @@ use std::collections::HashMap; use datadog_trace_protobuf::pb::Span; -use rand::{rngs::OsRng, Rng, RngCore}; use serde_json::Value; use tracing::debug; use crate::config::AwsConfig; -use crate::lifecycle::invocation::triggers::{ - api_gateway_http_event::APIGatewayHttpEvent, - api_gateway_rest_event::APIGatewayRestEvent, - dynamodb_event::DynamoDbRecord, - event_bridge_event::EventBridgeEvent, - kinesis_event::KinesisRecord, - sns_event::{SnsEntity, SnsRecord}, - sqs_event::SqsRecord, - Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, +use crate::lifecycle::invocation::{ + generate_span_id, + triggers::{ + api_gateway_http_event::APIGatewayHttpEvent, + api_gateway_rest_event::APIGatewayRestEvent, + dynamodb_event::DynamoDbRecord, + event_bridge_event::EventBridgeEvent, + kinesis_event::KinesisRecord, + sns_event::{SnsEntity, SnsRecord}, + sqs_event::SqsRecord, + Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, + }, }; -use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; use super::triggers::s3_event::S3Record; @@ -60,7 +61,7 @@ impl SpanInferrer { let mut trigger: Option> = None; let mut inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -84,7 +85,7 @@ impl SpanInferrer { if let Ok(sns_entity) = serde_json::from_str::(&t.body) { debug!("Found an SNS event wrapped in the SQS body"); let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -103,7 +104,7 @@ impl SpanInferrer { serde_json::from_str::(&t.body) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -127,7 +128,7 @@ impl SpanInferrer { serde_json::from_str::(message) { let mut wrapped_inferred_span = Span { - span_id: Self::generate_span_id(), + span_id: generate_span_id(), ..Default::default() }; @@ -254,15 +255,6 @@ impl SpanInferrer { } } - fn generate_span_id() -> u64 { - if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) { - return OsRng.next_u64(); - } - - let mut rng = rand::thread_rng(); - rng.gen() - } - /// Returns a clone of the carrier associated with the inferred span /// #[must_use] From c071d4503376897c7eaada11105efe35460a315d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:40:21 -0500 Subject: [PATCH 04/23] send `platform_init_start` data to processor --- bottlecap/src/bin/bottlecap/main.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index c72566ef3..301322829 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -400,9 +400,9 @@ async fn extension_loop_active( } Event::Telemetry(event) => match event.record { - TelemetryRecord::PlatformStart { request_id, .. } => { + TelemetryRecord::PlatformInitStart { .. } => { let mut p = invocation_processor.lock().await; - p.on_platform_start(request_id, event.time); + p.on_platform_init_start(event.time); drop(p); } TelemetryRecord::PlatformInitReport { @@ -415,6 +415,11 @@ async fn extension_loop_active( p.on_platform_init_report(metrics.duration_ms); drop(p); } + TelemetryRecord::PlatformStart { request_id, .. } => { + let mut p = invocation_processor.lock().await; + p.on_platform_start(request_id, event.time); + drop(p); + } TelemetryRecord::PlatformRuntimeDone { request_id, status, From a3512b05df3d7370899f2b9e4ada4dbbee58b34a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:04:05 -0500 Subject: [PATCH 05/23] send `PlatformInitStart` to main bus --- bottlecap/src/logs/lambda/processor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 6de124be6..d80db8912 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -87,8 +87,13 @@ impl LambdaProcessor { runtime_version_arn, .. // TODO: check if we could do something with this metrics: `initialization_type` and `phase` } => { + if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { + error!("Failed to send PlatformInitStart to the main event bus: {}", e); + } + let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display + Ok(Message::new( format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"), None, @@ -178,7 +183,6 @@ impl LambdaProcessor { )) }, // TODO: PlatformInitRuntimeDone - // TODO: PlatformInitReport // TODO: PlatformExtension // TODO: PlatformTelemetrySubscription // TODO: PlatformLogsDropped From 1aca5532e5d26ea1156b12389d01c41a2c7a9fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:04:19 -0500 Subject: [PATCH 06/23] update cold start `parent_id` --- bottlecap/src/lifecycle/invocation/processor.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 65d03e969..8ceb6f6d6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -125,13 +125,13 @@ impl Processor { pub fn on_platform_init_start(&mut self, time: DateTime) { // Create a cold start span - let mut cold_star_span = create_empty_span( + let mut cold_start_span = create_empty_span( String::from("aws.lambda.cold_start"), self.span.resource.clone(), self.span.service.clone(), ); - cold_star_span.span_id = generate_span_id(); - self.cold_start_span = Some(cold_star_span); + cold_start_span.span_id = generate_span_id(); + self.cold_start_span = Some(cold_start_span); let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) @@ -147,11 +147,12 @@ impl Processor { /// #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { + debug!("Setting cold start span duration: {duration_ms}"); self.enhanced_metrics.set_init_duration_metric(duration_ms); if let Some(cold_start_span) = &mut self.cold_start_span { // `round` is intentionally meant to be a whole integer - cold_start_span.duration = (duration_ms * MS_TO_NS).round() as i64; + cold_start_span.duration = (duration_ms * MS_TO_NS) as i64; } } @@ -230,7 +231,7 @@ impl Processor { if let Some(cold_start_span) = &mut self.cold_start_span { cold_start_span.trace_id = self.span.trace_id; - cold_start_span.parent_id = self.span.span_id; + cold_start_span.parent_id = self.span.parent_id; self.span .meta .insert(String::from("cold_start"), String::from("true")); @@ -253,6 +254,8 @@ impl Processor { if let Some(cold_start_span) = &self.cold_start_span { body_size += std::mem::size_of_val(cold_start_span); traces.push(cold_start_span.clone()); + // Reset the cold start span + self.cold_start_span = None; } // todo: figure out what to do here From 3eda969e5017aec57398cf0f28015752af5b83b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:16:52 -0500 Subject: [PATCH 07/23] fix start time of cold start span --- bottlecap/src/lifecycle/invocation/processor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 8ceb6f6d6..40cf180fe 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -130,8 +130,6 @@ impl Processor { self.span.resource.clone(), self.span.service.clone(), ); - cold_start_span.span_id = generate_span_id(); - self.cold_start_span = Some(cold_start_span); let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) @@ -140,7 +138,10 @@ impl Processor { .try_into() .unwrap_or_default(); - self.span.start = start_time; + cold_start_span.span_id = generate_span_id(); + cold_start_span.start = start_time; + + self.cold_start_span = Some(cold_start_span); } /// Given the duration of the platform init report, set the init duration metric. From 6bf8c86582786eb23e1d0e5bd33e111a6a862261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:11:45 -0500 Subject: [PATCH 08/23] enhanced metrics now have a `dynamic_value_tags` for tags which we have to calculate at points in time --- bottlecap/src/metrics/enhanced/lambda.rs | 120 +++++++++++++++++------ 1 file changed, 89 insertions(+), 31 deletions(-) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 6e4f53c72..15724aa63 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -1,10 +1,13 @@ -use super::constants::{self, BASE_LAMBDA_INVOCATION_PRICE}; -use super::statfs::statfs_info; +use crate::metrics::enhanced::{ + constants::{self, BASE_LAMBDA_INVOCATION_PRICE}, + statfs::statfs_info, +}; use crate::proc::{self, CPUData, NetworkData}; use crate::telemetry::events::ReportMetrics; -use dogstatsd::aggregator::Aggregator; use dogstatsd::metric; use dogstatsd::metric::{Metric, MetricValue}; +use dogstatsd::{aggregator::Aggregator, metric::SortedTags}; +use std::collections::HashMap; use std::env::consts::ARCH; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -18,12 +21,43 @@ use tracing::error; pub struct Lambda { pub aggregator: Arc>, pub config: Arc, + // Dynamic value tags are the ones we cannot obtain statically from the sandbox + dynamic_value_tags: HashMap, } impl Lambda { #[must_use] pub fn new(aggregator: Arc>, config: Arc) -> Lambda { - Lambda { aggregator, config } + Lambda { + aggregator, + config, + dynamic_value_tags: HashMap::new(), + } + } + + pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { + self.dynamic_value_tags + .insert(String::from("cold_start"), cold_start.to_string()); + + // Only set `proactive_initialization` tag if it is true + if proactive_initialization { + self.dynamic_value_tags.insert( + String::from("proactive_initialization"), + String::from("true"), + ); + } + } + + fn get_dynamic_value_tags(&self) -> Option { + let vec_tags: Vec = self + .dynamic_value_tags + .iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(); + + let string_tags = vec_tags.join(","); + + SortedTags::parse(&string_tags).ok() } pub fn increment_invocation_metric(&self) { @@ -45,7 +79,7 @@ impl Lambda { let metric = Metric::new( constants::INIT_DURATION_METRIC.into(), MetricValue::distribution(init_duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self @@ -62,7 +96,11 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - let metric = Metric::new(metric_name.into(), MetricValue::distribution(1f64), None); + let metric = Metric::new( + metric_name.into(), + MetricValue::distribution(1f64), + self.get_dynamic_value_tags(), + ); if let Err(e) = self .aggregator .lock() @@ -81,7 +119,7 @@ impl Lambda { constants::RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -101,7 +139,7 @@ impl Lambda { constants::POST_RUNTIME_DURATION_METRIC.into(), MetricValue::distribution(duration_ms), // Datadog expects this value as milliseconds, not seconds - None, + self.get_dynamic_value_tags(), ); if let Err(e) = self .aggregator @@ -113,10 +151,11 @@ impl Lambda { } } - pub(crate) fn generate_network_enhanced_metrics( + pub fn generate_network_enhanced_metrics( network_data_offset: NetworkData, network_data_end: NetworkData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let rx_bytes = network_data_end.rx_bytes - network_data_offset.rx_bytes; let tx_bytes = network_data_end.tx_bytes - network_data_offset.tx_bytes; @@ -125,7 +164,7 @@ impl Lambda { let metric = Metric::new( constants::RX_BYTES_METRIC.into(), MetricValue::distribution(rx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert rx_bytes metric: {}", e); @@ -134,7 +173,7 @@ impl Lambda { let metric = Metric::new( constants::TX_BYTES_METRIC.into(), MetricValue::distribution(tx_bytes), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tx_bytes metric: {}", e); @@ -143,7 +182,7 @@ impl Lambda { let metric = Metric::new( constants::TOTAL_NETWORK_METRIC.into(), MetricValue::distribution(total_network), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert total_network metric: {}", e); @@ -161,7 +200,12 @@ impl Lambda { match proc::get_network_data() { Ok(data) => { - Self::generate_network_enhanced_metrics(offset, data, &mut aggr); + Self::generate_network_enhanced_metrics( + offset, + data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } Err(_e) => { debug!("Could not find data to generate network enhanced metrics"); @@ -176,6 +220,7 @@ impl Lambda { cpu_data_offset: &CPUData, cpu_data_end: &CPUData, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let cpu_user_time = cpu_data_end.total_user_time_ms - cpu_data_offset.total_user_time_ms; let cpu_system_time = @@ -185,7 +230,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_USER_TIME_METRIC.into(), MetricValue::distribution(cpu_user_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_user_time metric: {}", e); @@ -194,7 +239,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_SYSTEM_TIME_METRIC.into(), MetricValue::distribution(cpu_system_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_system_time metric: {}", e); @@ -203,7 +248,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_TIME_METRIC.into(), MetricValue::distribution(cpu_total_time), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_time metric: {}", e); @@ -221,7 +266,12 @@ impl Lambda { let cpu_data = proc::get_cpu_data(); match (cpu_offset, cpu_data) { (Some(cpu_offset), Ok(cpu_data)) => { - Self::generate_cpu_time_enhanced_metrics(&cpu_offset, &cpu_data, &mut aggr); + Self::generate_cpu_time_enhanced_metrics( + &cpu_offset, + &cpu_data, + &mut aggr, + self.get_dynamic_value_tags(), + ); } (_, _) => { debug!("Could not find data to generate cpu time enhanced metrics"); @@ -235,6 +285,7 @@ impl Lambda { uptime_data_offset: f64, uptime_data_end: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let num_cores = cpu_data_end.individual_cpu_idle_times.len() as f64; let uptime = uptime_data_end - uptime_data_offset; @@ -276,7 +327,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_PCT_METRIC.into(), MetricValue::distribution(cpu_total_utilization_pct), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization_pct metric: {}", e); @@ -285,7 +336,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_TOTAL_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_total_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_total_utilization metric: {}", e); @@ -294,7 +345,7 @@ impl Lambda { let metric = Metric::new( constants::NUM_CORES_METRIC.into(), MetricValue::distribution(num_cores), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert num_cores metric: {}", e); @@ -303,7 +354,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MAX_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_max_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_max_utilization metric: {}", e); @@ -312,7 +363,7 @@ impl Lambda { let metric = Metric::new( constants::CPU_MIN_UTILIZATION_METRIC.into(), MetricValue::distribution(cpu_min_utilization), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert cpu_min_utilization metric: {}", e); @@ -341,6 +392,7 @@ impl Lambda { uptime_offset, uptime_data, &mut aggr, + self.get_dynamic_value_tags(), ); } (_, _, _, _) => { @@ -353,11 +405,12 @@ impl Lambda { tmp_max: f64, tmp_used: f64, aggr: &mut std::sync::MutexGuard, + tags: Option, ) { let metric = Metric::new( constants::TMP_MAX_METRIC.into(), MetricValue::distribution(tmp_max), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_max metric: {}", e); @@ -366,7 +419,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_USED_METRIC.into(), MetricValue::distribution(tmp_used), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_used metric: {}", e); @@ -376,7 +429,7 @@ impl Lambda { let metric = Metric::new( constants::TMP_FREE_METRIC.into(), MetricValue::distribution(tmp_free), - None, + tags.clone(), ); if let Err(e) = aggr.insert(metric) { error!("Failed to insert tmp_free metric: {}", e); @@ -389,6 +442,7 @@ impl Lambda { } let aggr = Arc::clone(&self.aggregator); + let tags = self.get_dynamic_value_tags(); tokio::spawn(async move { // Set tmp_max and initial value for tmp_used @@ -410,7 +464,7 @@ impl Lambda { _ = send_metrics.changed() => { let mut aggr: std::sync::MutexGuard = aggr.lock().expect("lock poisoned"); - Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr); + Self::generate_tmp_enhanced_metrics(tmp_max, tmp_used, &mut aggr, tags); return; } // Otherwise keep monitoring tmp usage periodically @@ -454,7 +508,7 @@ impl Lambda { let metric = metric::Metric::new( constants::DURATION_METRIC.into(), MetricValue::distribution(metrics.duration_ms * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert duration metric: {}", e); @@ -462,7 +516,7 @@ impl Lambda { let metric = metric::Metric::new( constants::BILLED_DURATION_METRIC.into(), MetricValue::distribution(metrics.billed_duration_ms as f64 * constants::MS_TO_SEC), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert billed duration metric: {}", e); @@ -470,7 +524,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MAX_MEMORY_USED_METRIC.into(), MetricValue::distribution(metrics.max_memory_used_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert max memory used metric: {}", e); @@ -478,7 +532,7 @@ impl Lambda { let metric = metric::Metric::new( constants::MEMORY_SIZE_METRIC.into(), MetricValue::distribution(metrics.memory_size_mb as f64), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert memory size metric: {}", e); @@ -489,7 +543,7 @@ impl Lambda { let metric = metric::Metric::new( constants::ESTIMATED_COST_METRIC.into(), MetricValue::distribution(cost_usd), - None, + self.get_dynamic_value_tags(), ); if let Err(e) = aggr.insert(metric) { error!("failed to insert estimated cost metric: {}", e); @@ -710,6 +764,7 @@ mod tests { network_offset, network_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::RX_BYTES_METRIC, 20000.0); @@ -746,6 +801,7 @@ mod tests { &cpu_offset, &cpu_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::CPU_USER_TIME_METRIC, 100.0); @@ -786,6 +842,7 @@ mod tests { uptime_offset, uptime_data, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); // the differences above and metric values below are from an invocation using the go agent to verify the calculations @@ -812,6 +869,7 @@ mod tests { tmp_max, tmp_used, &mut lambda.aggregator.lock().expect("lock poisoned"), + None, ); assert_sketch(&metrics_aggr, constants::TMP_MAX_METRIC, 550461440.0); From d1dec535f89e0934389b7f9e874d482caf087db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:12:12 -0500 Subject: [PATCH 09/23] `AwsConfig` now has a `sandbox_init_time` value --- bottlecap/src/bin/bottlecap/main.rs | 2 ++ bottlecap/src/config/mod.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 301322829..5b691c2ee 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -60,6 +60,7 @@ use std::{ path::Path, process::Command, sync::{Arc, Mutex}, + time::Instant, }; use telemetry::listener::TelemetryListenerConfig; use tokio::sync::mpsc::Sender; @@ -201,6 +202,7 @@ fn load_configs() -> (AwsConfig, Arc) { aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(), aws_session_token: env::var("AWS_SESSION_TOKEN").unwrap_or_default(), function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").unwrap_or_default(), + sandbox_init_time: Instant::now(), }; let lambda_directory = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string()); let config = match config::get_config(Path::new(&lambda_directory)) { diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 870b42d21..aded42881 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -4,6 +4,7 @@ pub mod processing_rule; pub mod trace_propagation_style; use std::path::Path; +use std::time::Instant; use std::vec; use figment::providers::{Format, Yaml}; @@ -218,6 +219,7 @@ pub struct AwsConfig { pub aws_secret_access_key: String, pub aws_session_token: String, pub function_name: String, + pub sandbox_init_time: Instant, } #[cfg(test)] From 5d8a5617cf6e3ef397c3e42abd8d7396e3a363e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:12:31 -0500 Subject: [PATCH 10/23] add `is_empty` to `ContextBuffer` --- bottlecap/src/lifecycle/invocation/context.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 4cffb929c..d8ffaa565 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -165,6 +165,13 @@ impl ContextBuffer { pub fn size(&self) -> usize { self.buffer.len() } + + /// Returns if the buffer is empty. + /// + #[must_use] + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } } #[cfg(test)] From c45abdfffcfe90d4de9d4cca1a14e551c014e72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:18:07 -0500 Subject: [PATCH 11/23] calculate init tags on invoke also add a method to reset processor invocation state --- .../src/lifecycle/invocation/processor.rs | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 40cf180fe..96c4ec3fd 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, - time::{SystemTime, UNIX_EPOCH}, + time::{Instant, SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; @@ -37,6 +37,7 @@ use crate::{ pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000; pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type"; @@ -98,6 +99,9 @@ impl Processor { /// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer. /// pub fn on_invoke_event(&mut self, request_id: String) { + self.reset_state(); + self.set_init_tags(); + self.context_buffer.create_context(request_id.clone()); if self.enhanced_metrics_enabled { // Collect offsets for network and cpu metrics @@ -123,6 +127,54 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(); } + /// Resets the state of the processor to default values. + /// + fn reset_state(&mut self) { + // Reset Span Context on Span + self.span.trace_id = 0; + self.span.parent_id = 0; + self.span.span_id = 0; + // Error + self.span.error = 0; + // Meta tags + self.span.meta.clear(); + // Extracted Span Context + self.extracted_span_context = None; + // Cold Start Span + self.cold_start_span = None; + } + + /// On the first invocation, determine if it's a cold start or proactive init. + /// + /// For every other invocation, it will always be warm start. + /// + fn set_init_tags(&mut self) { + let mut proactive_initialization = false; + let mut cold_start = false; + + // If it's empty, then we are in a cold start + if self.context_buffer.is_empty() { + let now = Instant::now(); + let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time); + if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() { + proactive_initialization = true; + } else { + cold_start = true; + } + } + + self.span.meta.extend([ + (String::from("cold_start"), cold_start.to_string()), + ( + String::from("proactive_initialization"), + proactive_initialization.to_string(), + ), + ]); + + self.enhanced_metrics + .set_init_tags(proactive_initialization, cold_start); + } + pub fn on_platform_init_start(&mut self, time: DateTime) { // Create a cold start span let mut cold_start_span = create_empty_span( @@ -255,8 +307,6 @@ impl Processor { if let Some(cold_start_span) = &self.cold_start_span { body_size += std::mem::size_of_val(cold_start_span); traces.push(cold_start_span.clone()); - // Reset the cold start span - self.cold_start_span = None; } // todo: figure out what to do here @@ -295,7 +345,7 @@ impl Processor { // Set the report log metrics self.enhanced_metrics.set_report_log_metrics(&metrics); - if let Some(context) = self.context_buffer.remove(request_id) { + if let Some(context) = self.context_buffer.get(request_id) { if context.runtime_duration_ms != 0.0 { let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; @@ -305,7 +355,7 @@ impl Processor { } // Set Network and CPU time metrics - if let Some(offsets) = context.enhanced_metric_data { + if let Some(offsets) = context.enhanced_metric_data.clone() { self.enhanced_metrics .set_network_enhanced_metrics(offsets.network_offset); self.enhanced_metrics @@ -320,14 +370,6 @@ impl Processor { pub fn on_invocation_start(&mut self, headers: HashMap, payload: Vec) { self.tracer_detected = true; - // Reset trace context - self.span.trace_id = 0; - self.span.parent_id = 0; - self.span.span_id = 0; - self.span.error = 0; - self.span.meta.clear(); - self.extracted_span_context = None; - let payload_value = match serde_json::from_slice::(&payload) { Ok(value) => value, Err(_) => json!({}), From 2277f8afd16eb43e26b1ce73aa4e437e2281e449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:26:34 -0500 Subject: [PATCH 12/23] restart init tags on set --- bottlecap/src/metrics/enhanced/lambda.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 15724aa63..b3786ec01 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -35,7 +35,11 @@ impl Lambda { } } + /// Set the dynamic value tags that are not available at compile time pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { + self.dynamic_value_tags.remove("cold_start"); + self.dynamic_value_tags.remove("proactive_initialization"); + self.dynamic_value_tags .insert(String::from("cold_start"), cold_start.to_string()); From a74fb309b1fce2a9f46a3678e9f1fe78d765a94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:34:50 -0500 Subject: [PATCH 13/23] set tags properly for proactive init --- bottlecap/src/lifecycle/invocation/processor.rs | 12 +++++++----- bottlecap/src/metrics/enhanced/lambda.rs | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 96c4ec3fd..bee42fc3a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -163,13 +163,15 @@ impl Processor { } } - self.span.meta.extend([ - (String::from("cold_start"), cold_start.to_string()), - ( + if proactive_initialization { + self.span.meta.insert( String::from("proactive_initialization"), proactive_initialization.to_string(), - ), - ]); + ); + } + self.span + .meta + .insert(String::from("cold_start"), cold_start.to_string()); self.enhanced_metrics .set_init_tags(proactive_initialization, cold_start); diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index b3786ec01..cb11a9811 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -39,7 +39,7 @@ impl Lambda { pub fn set_init_tags(&mut self, proactive_initialization: bool, cold_start: bool) { self.dynamic_value_tags.remove("cold_start"); self.dynamic_value_tags.remove("proactive_initialization"); - + self.dynamic_value_tags .insert(String::from("cold_start"), cold_start.to_string()); From 583a79c89735b9efe2d824a853473829f68229c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:40:46 -0500 Subject: [PATCH 14/23] fix unit test --- bottlecap/src/secrets/decrypt.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bottlecap/src/secrets/decrypt.rs b/bottlecap/src/secrets/decrypt.rs index a29e6a39d..615e35284 100644 --- a/bottlecap/src/secrets/decrypt.rs +++ b/bottlecap/src/secrets/decrypt.rs @@ -241,6 +241,7 @@ mod tests { aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY".to_string(), aws_session_token: "AQoDYXdzEJr...".to_string(), function_name: "arn:some-function".to_string(), + sandbox_init_time: Instant::now(), }, RequestArgs { service: "secretsmanager".to_string(), From 816e96644591522ed0914b10e8d47e86d5328ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:46:38 -0500 Subject: [PATCH 15/23] remove debug line --- bottlecap/src/lifecycle/invocation/processor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index bee42fc3a..499008ac9 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -202,7 +202,6 @@ impl Processor { /// #[allow(clippy::cast_possible_truncation)] pub fn on_platform_init_report(&mut self, duration_ms: f64) { - debug!("Setting cold start span duration: {duration_ms}"); self.enhanced_metrics.set_init_duration_metric(duration_ms); if let Some(cold_start_span) = &mut self.cold_start_span { From 128802ef8658d6d20b98e10b427b546755adb329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:03:38 -0500 Subject: [PATCH 16/23] make sure `cold_start` tag is only set in one place --- bottlecap/src/lifecycle/invocation/processor.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 499008ac9..7ec4901bb 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -286,9 +286,6 @@ impl Processor { if let Some(cold_start_span) = &mut self.cold_start_span { cold_start_span.trace_id = self.span.trace_id; cold_start_span.parent_id = self.span.parent_id; - self.span - .meta - .insert(String::from("cold_start"), String::from("true")); } if self.tracer_detected { From 3ac5c74df9bc1c14642eda0a18058b858732a494 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:53:47 -0500 Subject: [PATCH 17/23] add service mapping config serializer --- bottlecap/src/config/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 8acc9feac..f9fb467e9 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -1,8 +1,10 @@ pub mod flush_strategy; pub mod log_level; pub mod processing_rule; +pub mod service_mapping; pub mod trace_propagation_style; +use std::collections::HashMap; use std::path::Path; use std::time::Instant; use std::vec; @@ -15,6 +17,7 @@ use trace_propagation_style::{deserialize_trace_propagation_style, TracePropagat use crate::config::flush_strategy::FlushStrategy; use crate::config::log_level::{deserialize_log_level, LogLevel}; use crate::config::processing_rule::{deserialize_processing_rules, ProcessingRule}; +use crate::config::service_mapping::deserialize_service_mapping; /// `FailoverConfig` is a struct that represents fields that are not supported in the extension yet. /// @@ -68,6 +71,8 @@ pub struct Config { pub https_proxy: Option, pub capture_lambda_payload: bool, pub capture_lambda_payload_max_depth: u32, + #[serde(deserialize_with = "deserialize_service_mapping")] + pub service_mapping: HashMap, // Trace Propagation #[serde(deserialize_with = "deserialize_trace_propagation_style")] pub trace_propagation_style: Vec, @@ -99,6 +104,7 @@ impl Default for Config { https_proxy: None, capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, + service_mapping: HashMap::new(), // Trace Propagation trace_propagation_style: vec![ TracePropagationStyle::Datadog, From 86d6fb801b38f35673b5643d1607155e828aad35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:53:58 -0500 Subject: [PATCH 18/23] add `service_mapping.rs` --- bottlecap/src/config/service_mapping.rs | 35 +++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 bottlecap/src/config/service_mapping.rs diff --git a/bottlecap/src/config/service_mapping.rs b/bottlecap/src/config/service_mapping.rs new file mode 100644 index 000000000..4deda11fd --- /dev/null +++ b/bottlecap/src/config/service_mapping.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Deserializer}; +use tracing::debug; + +#[allow(clippy::module_name_repetitions)] +pub fn deserialize_service_mapping<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: String = String::deserialize(deserializer)?; + + let map = s + .split(',') + .map(|pair| { + let mut split = pair.split(':'); + + let service = split.next(); + let to_map = split.next(); + + if let (Some(service), Some(to_map)) = (service, to_map) { + Ok((service.trim().to_string(), to_map.trim().to_string())) + } else { + debug!("Ignoring invalid service mapping pair: {pair}"); + Err(serde::de::Error::custom(format!( + "Failed to deserialize service mapping for pair: {pair}" + ))) + } + }) + .collect(); + + map +} From 14cca31a3899a41f7b7469642e4ec65ef3e8a30f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:54:16 -0500 Subject: [PATCH 19/23] add `ServiceNameResolver` interface for service mapping --- .../src/lifecycle/invocation/triggers/mod.rs | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index 6704a459d..2f9a0100a 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -19,18 +19,42 @@ pub const DATADOG_CARRIER_KEY: &str = "_datadog"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn"; -pub trait Trigger { +/// Resolves the service name for a given trigger depending on +/// service mapping configuration. +pub trait ServiceNameResolver { + /// Get the specific service name for this trigger type, it will + /// be used as a key to resolve the service name + fn get_specific_identifier(&self) -> String; + + /// Get the generic service mapping key for the trigger + fn get_generic_identifier(&self) -> &'static str; +} + +pub trait Trigger: ServiceNameResolver { fn new(payload: Value) -> Option where Self: Sized; fn is_match(payload: &Value) -> bool where Self: Sized; - fn enrich_span(&self, span: &mut Span); + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap); fn get_tags(&self) -> HashMap; fn get_arn(&self, region: &str) -> String; fn get_carrier(&self) -> HashMap; fn is_async(&self) -> bool; + + /// Default implementation for service name resolution + fn resolve_service_name( + &self, + service_mapping: &HashMap, + fallback: &str, + ) -> String { + service_mapping + .get(&self.get_specific_identifier()) + .or_else(|| service_mapping.get(self.get_generic_identifier())) + .unwrap_or(&fallback.to_string()) + .to_string() + } } #[must_use] From 7c6c06091628a6f216a951c2afda4aaa570e6731 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:54:28 -0500 Subject: [PATCH 20/23] implement interface in every trigger --- .../triggers/api_gateway_http_event.rs | 60 ++++++++++++++-- .../triggers/api_gateway_rest_event.rs | 61 ++++++++++++++-- .../invocation/triggers/dynamodb_event.rs | 55 +++++++++++++-- .../invocation/triggers/event_bridge_event.rs | 67 +++++++++++++++--- .../invocation/triggers/kinesis_event.rs | 70 +++++++++++++++---- .../triggers/lambda_function_url_event.rs | 51 ++++++++++++-- .../lifecycle/invocation/triggers/s3_event.rs | 48 +++++++++++-- .../invocation/triggers/sns_event.rs | 68 ++++++++++++++---- .../invocation/triggers/sqs_event.rs | 58 +++++++++++---- .../triggers/step_function_event.rs | 21 +++++- 10 files changed, 478 insertions(+), 81 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs index db8077257..cdf372001 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs @@ -7,7 +7,8 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ - get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + get_aws_partition_by_region, lowercase_key, ServiceNameResolver, Trigger, + FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -63,7 +64,7 @@ impl Trigger for APIGatewayHttpEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an API Gateway HTTP Event"); let resource = if self.route_key.is_empty() { format!( @@ -81,8 +82,9 @@ impl Trigger for APIGatewayHttpEvent { path = self.request_context.http.path ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = "aws.httpapi".to_string(); span.service = service_name; @@ -191,6 +193,15 @@ impl Trigger for APIGatewayHttpEvent { } } +impl ServiceNameResolver for APIGatewayHttpEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_api_gateway" + } +} #[cfg(test)] mod tests { use super::*; @@ -268,7 +279,8 @@ mod tests { let event = APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.httpapi"); assert_eq!( span.service, @@ -331,7 +343,8 @@ mod tests { let event = APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.httpapi"); assert_eq!( span.service, @@ -393,4 +406,39 @@ mod tests { "arn:aws:apigateway:sa-east-1::/restapis/x02yirxc7a/stages/$default" ); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("api_gateway_http_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("x02yirxc7a".to_string(), "specific-service".to_string()), + ( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name( + &specific_service_mapping, + &event.request_context.domain_name + ), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event + .resolve_service_name(&generic_service_mapping, &event.request_context.domain_name), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs index e8fc443dd..67a1180be 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs @@ -7,7 +7,8 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ - get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + get_aws_partition_by_region, lowercase_key, ServiceNameResolver, Trigger, + FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -66,7 +67,7 @@ impl Trigger for APIGatewayRestEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an API Gateway REST Event"); let resource = format!( "{http_method} {path}", @@ -79,8 +80,9 @@ impl Trigger for APIGatewayRestEvent { path = self.request_context.path ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = "aws.apigateway".to_string(); span.service = service_name; @@ -179,6 +181,16 @@ impl Trigger for APIGatewayRestEvent { } } +impl ServiceNameResolver for APIGatewayRestEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_api_gateway" + } +} + #[cfg(test)] mod tests { use super::*; @@ -240,7 +252,8 @@ mod tests { let event = APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.apigateway"); assert_eq!(span.service, "id.execute-api.us-east-1.amazonaws.com"); assert_eq!(span.resource, "GET /path"); @@ -298,7 +311,8 @@ mod tests { let event = APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.apigateway"); assert_eq!( span.service, @@ -368,4 +382,39 @@ mod tests { "arn:aws:apigateway:us-east-1::/restapis/id/stages/$default" ); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("api_gateway_rest_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("id".to_string(), "specific-service".to_string()), + ( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name( + &specific_service_mapping, + &event.request_context.domain_name + ), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event + .resolve_service_name(&generic_service_mapping, &event.request_context.domain_name), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs index 026e74832..8503f46c5 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs @@ -6,7 +6,7 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::S_TO_NS, - triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -74,14 +74,14 @@ impl Trigger for DynamoDbRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for a DynamoDB event"); - let table_name = self.event_source_arn.split('/').nth(1).unwrap_or_default(); + let table_name = self.get_specific_identifier(); let resource = format!("{} {}", self.event_name.clone(), table_name); let start_time = (self.dynamodb.approximate_creation_date_time * S_TO_NS) as i64; - // todo: service mapping and peer service - let service_name = "dynamodb"; + + let service_name = self.resolve_service_name(service_mapping, "dynamodb"); span.name = String::from("aws.dynamodb"); span.service = service_name.to_string(); @@ -129,6 +129,20 @@ impl Trigger for DynamoDbRecord { } } +impl ServiceNameResolver for DynamoDbRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split('/') + .nth(1) + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_dynamodb" + } +} + #[cfg(test)] mod tests { use super::*; @@ -176,7 +190,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.dynamodb"); assert_eq!(span.service, "dynamodb"); assert_eq!(span.resource, "INSERT ExampleTableWithStream"); @@ -237,4 +252,32 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ( + "ExampleTableWithStream".to_string(), + "specific-service".to_string(), + ), + ("lambda_dynamodb".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "dynamodb"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_dynamodb".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "dynamodb"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs index ff7d174c6..f9b1e17b1 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs @@ -7,7 +7,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::{MS_TO_NS, S_TO_NS}, - triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, }; const DATADOG_START_TIME_KEY: &str = "x-datadog-start-time"; @@ -49,7 +51,7 @@ impl Trigger for EventBridgeEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { // EventBridge events have a timestamp resolution in seconds let start_time_seconds = self .time @@ -57,17 +59,13 @@ impl Trigger for EventBridgeEvent { .unwrap_or((self.time.timestamp_millis() as f64 * S_TO_NS) as i64); let carrier = self.get_carrier(); - let resource_name = carrier - .get(DATADOG_RESOURCE_NAME_KEY) - .unwrap_or(&self.source) - .clone(); + let resource_name = self.get_specific_identifier(); let start_time = carrier .get(DATADOG_START_TIME_KEY) .and_then(|s| s.parse::().ok()) .map_or(start_time_seconds, |s| (s * MS_TO_NS) as i64); - // todo: service mapping and peer service - let service_name = "eventbridge"; + let service_name = self.resolve_service_name(service_mapping, "eventbridge"); span.name = String::from("aws.eventbridge"); span.service = service_name.to_string(); @@ -105,6 +103,20 @@ impl Trigger for EventBridgeEvent { } } +impl ServiceNameResolver for EventBridgeEvent { + fn get_specific_identifier(&self) -> String { + let carrier = self.get_carrier(); + carrier + .get(DATADOG_RESOURCE_NAME_KEY) + .unwrap_or(&self.source) + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_eventbridge" + } +} + #[cfg(test)] mod tests { use super::*; @@ -168,7 +180,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); let expected = serde_json::from_str(&read_json_file("eventbridge_span.json")) .expect("Failed to deserialize into Span"); @@ -183,7 +196,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.resource, "my.event"); } @@ -196,7 +210,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.resource, "testBus"); // Seconds resolution @@ -239,4 +254,34 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("eventbridge_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = EventBridgeEvent::new(payload).expect("Failed to deserialize EventBridgeEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("testBus".to_string(), "specific-service".to_string()), + ( + "lambda_eventbridge".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "eventbridge"), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_eventbridge".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "eventbridge"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs index c735d5439..ae55add0c 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs @@ -9,7 +9,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::S_TO_NS, - triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -69,20 +71,24 @@ impl Trigger for KinesisRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { - let event_source_arn = &self.event_source_arn; - let parsed_stream_name = event_source_arn.split('/').last().unwrap_or_default(); - let parsed_shard_id = self.event_id.split(':').next().unwrap_or_default(); - span.name = "aws.kinesis".to_string(); - span.service = "kinesis".to_string(); + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { + let stream_name = self.get_specific_identifier(); + let shard_id = self.event_id.split(':').next().unwrap_or_default(); + let service_name = self.resolve_service_name(service_mapping, "kinesis"); + + span.name = String::from("aws.kinesis"); + span.service = service_name; span.start = (self.kinesis.approximate_arrival_timestamp * S_TO_NS) as i64; - span.resource = parsed_stream_name.to_string(); + span.resource.clone_from(&stream_name); span.r#type = "web".to_string(); span.meta = HashMap::from([ ("operation_name".to_string(), "aws.kinesis".to_string()), - ("stream_name".to_string(), parsed_stream_name.to_string()), - ("shard_id".to_string(), parsed_shard_id.to_string()), - ("event_source_arn".to_string(), event_source_arn.to_string()), + ("stream_name".to_string(), stream_name.to_string()), + ("shard_id".to_string(), shard_id.to_string()), + ( + "event_source_arn".to_string(), + self.event_source_arn.to_string(), + ), ("event_id".to_string(), self.event_id.to_string()), ("event_name".to_string(), self.event_name.to_string()), ("event_version".to_string(), self.event_version.to_string()), @@ -120,6 +126,20 @@ impl Trigger for KinesisRecord { } } +impl ServiceNameResolver for KinesisRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split('/') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_kinesis" + } +} + #[cfg(test)] mod tests { use super::*; @@ -170,7 +190,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = KinesisRecord::new(payload).expect("Failed to deserialize S3Record"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.kinesis"); assert_eq!(span.service, "kinesis"); assert_eq!(span.resource, "kinesisStream"); @@ -245,4 +266,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize KinesisRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("kinesisStream".to_string(), "specific-service".to_string()), + ("lambda_kinesis".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "kinesis"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_kinesis".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "kinesis"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs b/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs index 087677a27..14a2eaa32 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs @@ -6,7 +6,7 @@ use serde_json::Value; use crate::lifecycle::invocation::{ processor::MS_TO_NS, - triggers::{lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{lowercase_key, ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -28,6 +28,8 @@ pub struct RequestContext { pub time_epoch: i64, #[serde(rename = "requestId")] pub request_id: String, + #[serde(rename = "apiId")] + pub api_id: String, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -61,7 +63,7 @@ impl Trigger for LambdaFunctionUrlEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { let resource = format!( "{} {}", self.request_context.http.method, self.request_context.http.path @@ -74,8 +76,9 @@ impl Trigger for LambdaFunctionUrlEvent { ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping and peer service - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = String::from("aws.lambda.url"); span.service = service_name; @@ -168,6 +171,16 @@ impl Trigger for LambdaFunctionUrlEvent { } } +impl ServiceNameResolver for LambdaFunctionUrlEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_url" + } +} + #[cfg(test)] mod tests { use super::*; @@ -222,6 +235,7 @@ mod tests { }, account_id: String::from("601427279990"), domain_name: String::from("a8hyhsshac.lambda-url.eu-south-1.amazonaws.com"), + api_id: String::from("a8hyhsshac"), }, }; @@ -252,7 +266,8 @@ mod tests { let event = LambdaFunctionUrlEvent::new(payload) .expect("Failed to deserialize LambdaFunctionUrlEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.lambda.url"); assert_eq!( span.service, @@ -306,4 +321,30 @@ mod tests { ); env::remove_var("AWS_LAMBDA_FUNCTION_NAME"); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("lambda_function_url_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = LambdaFunctionUrlEvent::new(payload) + .expect("Failed to deserialize LambdaFunctionUrlEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("a8hyhsshac".to_string(), "specific-service".to_string()), + ("lambda_url".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "domain-name"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_url".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "domain-name"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs index 1e7fe5beb..edd2709ad 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs @@ -8,7 +8,7 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, - triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -77,15 +77,15 @@ impl Trigger for S3Record { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an InferredSpan span with S3 event"); - let bucket_name = self.s3.bucket.name.clone(); + let bucket_name = self.get_specific_identifier(); let start_time = self .event_time .timestamp_nanos_opt() .unwrap_or((self.event_time.timestamp_millis() as f64 * MS_TO_NS) as i64); - // todo: service mapping - let service_name = "s3"; + + let service_name = self.resolve_service_name(service_mapping, "s3"); span.name = String::from("aws.s3"); span.service = service_name.to_string(); @@ -123,6 +123,16 @@ impl Trigger for S3Record { } } +impl ServiceNameResolver for S3Record { + fn get_specific_identifier(&self) -> String { + self.s3.bucket.name.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_s3" + } +} + #[cfg(test)] mod tests { use super::*; @@ -177,7 +187,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = S3Record::new(payload).expect("Failed to deserialize S3Record"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.s3"); assert_eq!(span.service, "s3"); assert_eq!(span.resource, "example-bucket"); @@ -237,4 +248,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("s3_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = S3Record::new(payload).expect("Failed to deserialize S3Record"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("example-bucket".to_string(), "specific-service".to_string()), + ("lambda_s3".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "s3"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_s3".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "s3"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs index 2b7514cf1..47091a9d6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; +use datadog_trace_protobuf::pb::Span; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::debug; @@ -9,7 +10,7 @@ use crate::lifecycle::invocation::{ base64_to_string, processor::MS_TO_NS, triggers::{ - event_bridge_event::EventBridgeEvent, Trigger, DATADOG_CARRIER_KEY, + event_bridge_event::EventBridgeEvent, ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -82,33 +83,26 @@ impl Trigger for SnsRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut datadog_trace_protobuf::pb::Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an SNS Event"); - let resource = self - .sns - .topic_arn - .clone() - .split(':') - .last() - .unwrap_or_default() - .to_string(); + let resource_name = self.get_specific_identifier(); let start_time = self .sns .timestamp .timestamp_nanos_opt() .unwrap_or((self.sns.timestamp.timestamp_millis() as f64 * MS_TO_NS) as i64); - // todo: service mapping - let service_name = "sns".to_string(); + + let service_name = self.resolve_service_name(service_mapping, "sns"); span.name = "aws.sns".to_string(); span.service = service_name.to_string(); - span.resource.clone_from(&resource); + span.resource.clone_from(&resource_name); span.r#type = "web".to_string(); span.start = start_time; span.meta.extend([ ("operation_name".to_string(), "aws.sns".to_string()), - ("topicname".to_string(), resource), + ("topicname".to_string(), resource_name), ("topic_arn".to_string(), self.sns.topic_arn.clone()), ("message_id".to_string(), self.sns.message_id.clone()), ("type".to_string(), self.sns.r#type.clone()), @@ -164,6 +158,21 @@ impl Trigger for SnsRecord { } } +impl ServiceNameResolver for SnsRecord { + fn get_specific_identifier(&self) -> String { + self.sns + .topic_arn + .split(':') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_sns" + } +} + #[cfg(test)] mod tests { use datadog_trace_protobuf::pb::Span; @@ -224,7 +233,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.sns"); assert_eq!(span.service, "sns"); assert_eq!(span.resource, "serverlessTracingTopicPy"); @@ -341,4 +351,32 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ( + "serverlessTracingTopicPy".to_string(), + "specific-service".to_string(), + ), + ("lambda_sns".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "sns"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_sns".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "sns"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs index 6e748d4d9..c9766a736 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs @@ -10,7 +10,7 @@ use crate::lifecycle::invocation::{ event_bridge_event::EventBridgeEvent, get_aws_partition_by_region, sns_event::{SnsEntity, SnsRecord}, - Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -98,23 +98,17 @@ impl Trigger for SqsRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an SQS Event"); - let resource = self - .event_source_arn - .clone() - .split(':') - .last() - .unwrap_or_default() - .to_string(); + let resource = self.get_specific_identifier(); let start_time = (self .attributes .sent_timestamp .parse::() .unwrap_or_default() as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = "sqs"; + + let service_name = self.resolve_service_name(service_mapping, "sqs"); span.name = "aws.sqs".to_string(); span.service = service_name.to_string(); @@ -198,6 +192,20 @@ impl Trigger for SqsRecord { } } +impl ServiceNameResolver for SqsRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split(':') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_sqs" + } +} + #[cfg(test)] mod tests { use super::*; @@ -260,7 +268,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.sqs"); assert_eq!(span.service, "sqs"); assert_eq!(span.resource, "MyQueue"); @@ -391,4 +400,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("MyQueue".to_string(), "specific-service".to_string()), + ("lambda_sqs".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "sqs"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_sqs".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "sqs"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs index 91eb2af54..ee77434bc 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs @@ -5,7 +5,9 @@ use serde_json::Value; use sha2::{Digest, Sha256}; use crate::{ - lifecycle::invocation::triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + lifecycle::invocation::triggers::{ + ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, traces::{ context::{Sampling, SpanContext}, propagation::text_map_propagator::DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, @@ -82,7 +84,12 @@ impl Trigger for StepFunctionEvent { execution_id.is_some() && name.is_some() && entered_time.is_some() } - fn enrich_span(&self, _span: &mut datadog_trace_protobuf::pb::Span) {} + fn enrich_span( + &self, + _span: &mut datadog_trace_protobuf::pb::Span, + _service_mapping: &HashMap, + ) { + } fn get_tags(&self) -> HashMap { HashMap::from([( @@ -182,6 +189,16 @@ impl StepFunctionEvent { } } +impl ServiceNameResolver for StepFunctionEvent { + fn get_specific_identifier(&self) -> String { + String::new() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_stepfunction" + } +} + #[cfg(test)] mod tests { use super::*; From b8d38bddcfcd3b47be7ce804f8883f3e3e584745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:54:44 -0500 Subject: [PATCH 21/23] send `service_mapping` lookup table to span enricher --- .../src/lifecycle/invocation/span_inferrer.rs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index f68134f46..9540ff7de 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -24,7 +24,9 @@ use crate::lifecycle::invocation::{ }; use crate::traces::{context::SpanContext, propagation::Propagator}; +#[derive(Default)] pub struct SpanInferrer { + service_mapping: HashMap, // Span inferred from the Lambda incoming request payload pub inferred_span: Option, // Nested span inferred from the Lambda incoming request payload @@ -39,16 +41,11 @@ pub struct SpanInferrer { trigger_tags: Option>, } -impl Default for SpanInferrer { - fn default() -> Self { - Self::new() - } -} - impl SpanInferrer { #[must_use] - pub fn new() -> Self { + pub fn new(service_mapping: HashMap) -> Self { Self { + service_mapping, inferred_span: None, wrapped_inferred_span: None, is_async_span: false, @@ -79,25 +76,25 @@ impl SpanInferrer { if APIGatewayHttpEvent::is_match(payload_value) { if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if APIGatewayRestEvent::is_match(payload_value) { if let Some(t) = APIGatewayRestEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if LambdaFunctionUrlEvent::is_match(payload_value) { if let Some(t) = LambdaFunctionUrlEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if SqsRecord::is_match(payload_value) { if let Some(t) = SqsRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); // Check for SNS event wrapped in the SQS body if let Ok(sns_entity) = serde_json::from_str::(&t.body) { @@ -111,7 +108,7 @@ impl SpanInferrer { sns: sns_entity, event_subscription_arn: None, }; - wt.enrich_span(&mut wrapped_inferred_span); + wt.enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span.meta.extend(wt.get_tags()); wrapped_inferred_span.duration = @@ -126,7 +123,8 @@ impl SpanInferrer { ..Default::default() }; - event_bridge_entity.enrich_span(&mut wrapped_inferred_span); + event_bridge_entity + .enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span.meta.extend(event_bridge_entity.get_tags()); wrapped_inferred_span.duration = @@ -139,7 +137,7 @@ impl SpanInferrer { } } else if SnsRecord::is_match(payload_value) { if let Some(t) = SnsRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); if let Some(message) = &t.sns.message { if let Ok(event_bridge_wrapper_message) = @@ -150,7 +148,8 @@ impl SpanInferrer { ..Default::default() }; - event_bridge_wrapper_message.enrich_span(&mut wrapped_inferred_span); + event_bridge_wrapper_message + .enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span .meta .extend(event_bridge_wrapper_message.get_tags()); @@ -166,25 +165,25 @@ impl SpanInferrer { } } else if DynamoDbRecord::is_match(payload_value) { if let Some(t) = DynamoDbRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if S3Record::is_match(payload_value) { if let Some(t) = S3Record::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if EventBridgeEvent::is_match(payload_value) { if let Some(t) = EventBridgeEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if KinesisRecord::is_match(payload_value) { if let Some(t) = KinesisRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } @@ -240,7 +239,6 @@ impl SpanInferrer { } // TODO: add status tag and other info from response - // TODO: add peer.service pub fn complete_inferred_spans(&mut self, invocation_span: &Span) { if let Some(s) = &mut self.inferred_span { if let Some(ws) = &mut self.wrapped_inferred_span { @@ -262,6 +260,7 @@ impl SpanInferrer { // Set error ws.error = invocation_span.error; + ws.meta.insert(String::from("peer.service"), s.service.clone()); ws.trace_id = invocation_span.trace_id; } @@ -279,6 +278,7 @@ impl SpanInferrer { // Set error s.error = invocation_span.error; + s.meta.insert(String::from("peer.service"), invocation_span.service.clone()); s.trace_id = invocation_span.trace_id; } From af6508c28639ef153a6261f051d6ef2c64f28dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 21:55:01 -0500 Subject: [PATCH 22/23] create `SpanInferrer` with `service_mapping` config --- bottlecap/src/lifecycle/invocation/processor.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index eb9e00b6c..b8bd3e40b 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -83,7 +83,7 @@ impl Processor { Processor { context_buffer: ContextBuffer::default(), - inferrer: SpanInferrer::default(), + inferrer: SpanInferrer::new(config.service_mapping.clone()), span: create_empty_span(String::from("aws.lambda"), resource, service), cold_start_span: None, extracted_span_context: None, @@ -266,10 +266,7 @@ impl Processor { .meta .insert("request_id".to_string(), request_id.clone()); // todo(duncanista): add missing tags - // - cold start, proactive init // - language - // - function.request - capture lambda payload - // - function.response // - metrics tags (for asm) if let Some(offsets) = &context.enhanced_metric_data { From c32678ed136996f8efd34da190d5a641972c7da9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:00:31 -0500 Subject: [PATCH 23/23] fmt --- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 8 ++++++-- bottlecap/src/lifecycle/invocation/triggers/s3_event.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 9540ff7de..3391bc689 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -260,7 +260,8 @@ impl SpanInferrer { // Set error ws.error = invocation_span.error; - ws.meta.insert(String::from("peer.service"), s.service.clone()); + ws.meta + .insert(String::from("peer.service"), s.service.clone()); ws.trace_id = invocation_span.trace_id; } @@ -278,7 +279,10 @@ impl SpanInferrer { // Set error s.error = invocation_span.error; - s.meta.insert(String::from("peer.service"), invocation_span.service.clone()); + s.meta.insert( + String::from("peer.service"), + invocation_span.service.clone(), + ); s.trace_id = invocation_span.trace_id; } diff --git a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs index edd2709ad..d45dc1f50 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs @@ -85,7 +85,7 @@ impl Trigger for S3Record { .timestamp_nanos_opt() .unwrap_or((self.event_time.timestamp_millis() as f64 * MS_TO_NS) as i64); - let service_name = self.resolve_service_name(service_mapping, "s3"); + let service_name = self.resolve_service_name(service_mapping, "s3"); span.name = String::from("aws.s3"); span.service = service_name.to_string();