diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 8acc9feac..f9fb467e9 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -1,8 +1,10 @@ pub mod flush_strategy; pub mod log_level; pub mod processing_rule; +pub mod service_mapping; pub mod trace_propagation_style; +use std::collections::HashMap; use std::path::Path; use std::time::Instant; use std::vec; @@ -15,6 +17,7 @@ use trace_propagation_style::{deserialize_trace_propagation_style, TracePropagat use crate::config::flush_strategy::FlushStrategy; use crate::config::log_level::{deserialize_log_level, LogLevel}; use crate::config::processing_rule::{deserialize_processing_rules, ProcessingRule}; +use crate::config::service_mapping::deserialize_service_mapping; /// `FailoverConfig` is a struct that represents fields that are not supported in the extension yet. /// @@ -68,6 +71,8 @@ pub struct Config { pub https_proxy: Option, pub capture_lambda_payload: bool, pub capture_lambda_payload_max_depth: u32, + #[serde(deserialize_with = "deserialize_service_mapping")] + pub service_mapping: HashMap, // Trace Propagation #[serde(deserialize_with = "deserialize_trace_propagation_style")] pub trace_propagation_style: Vec, @@ -99,6 +104,7 @@ impl Default for Config { https_proxy: None, capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, + service_mapping: HashMap::new(), // Trace Propagation trace_propagation_style: vec![ TracePropagationStyle::Datadog, diff --git a/bottlecap/src/config/service_mapping.rs b/bottlecap/src/config/service_mapping.rs new file mode 100644 index 000000000..4deda11fd --- /dev/null +++ b/bottlecap/src/config/service_mapping.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Deserializer}; +use tracing::debug; + +#[allow(clippy::module_name_repetitions)] +pub fn deserialize_service_mapping<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: String = String::deserialize(deserializer)?; + + let map = s + .split(',') + .map(|pair| { + let mut split = pair.split(':'); + + let service = split.next(); + let to_map = split.next(); + + if let (Some(service), Some(to_map)) = (service, to_map) { + Ok((service.trim().to_string(), to_map.trim().to_string())) + } else { + debug!("Ignoring invalid service mapping pair: {pair}"); + Err(serde::de::Error::custom(format!( + "Failed to deserialize service mapping for pair: {pair}" + ))) + } + }) + .collect(); + + map +} diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index eb9e00b6c..b8bd3e40b 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -83,7 +83,7 @@ impl Processor { Processor { context_buffer: ContextBuffer::default(), - inferrer: SpanInferrer::default(), + inferrer: SpanInferrer::new(config.service_mapping.clone()), span: create_empty_span(String::from("aws.lambda"), resource, service), cold_start_span: None, extracted_span_context: None, @@ -266,10 +266,7 @@ impl Processor { .meta .insert("request_id".to_string(), request_id.clone()); // todo(duncanista): add missing tags - // - cold start, proactive init // - language - // - function.request - capture lambda payload - // - function.response // - metrics tags (for asm) if let Some(offsets) = &context.enhanced_metric_data { diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index f68134f46..3391bc689 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -24,7 +24,9 @@ use crate::lifecycle::invocation::{ }; use crate::traces::{context::SpanContext, propagation::Propagator}; +#[derive(Default)] pub struct SpanInferrer { + service_mapping: HashMap, // Span inferred from the Lambda incoming request payload pub inferred_span: Option, // Nested span inferred from the Lambda incoming request payload @@ -39,16 +41,11 @@ pub struct SpanInferrer { trigger_tags: Option>, } -impl Default for SpanInferrer { - fn default() -> Self { - Self::new() - } -} - impl SpanInferrer { #[must_use] - pub fn new() -> Self { + pub fn new(service_mapping: HashMap) -> Self { Self { + service_mapping, inferred_span: None, wrapped_inferred_span: None, is_async_span: false, @@ -79,25 +76,25 @@ impl SpanInferrer { if APIGatewayHttpEvent::is_match(payload_value) { if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if APIGatewayRestEvent::is_match(payload_value) { if let Some(t) = APIGatewayRestEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if LambdaFunctionUrlEvent::is_match(payload_value) { if let Some(t) = LambdaFunctionUrlEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if SqsRecord::is_match(payload_value) { if let Some(t) = SqsRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); // Check for SNS event wrapped in the SQS body if let Ok(sns_entity) = serde_json::from_str::(&t.body) { @@ -111,7 +108,7 @@ impl SpanInferrer { sns: sns_entity, event_subscription_arn: None, }; - wt.enrich_span(&mut wrapped_inferred_span); + wt.enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span.meta.extend(wt.get_tags()); wrapped_inferred_span.duration = @@ -126,7 +123,8 @@ impl SpanInferrer { ..Default::default() }; - event_bridge_entity.enrich_span(&mut wrapped_inferred_span); + event_bridge_entity + .enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span.meta.extend(event_bridge_entity.get_tags()); wrapped_inferred_span.duration = @@ -139,7 +137,7 @@ impl SpanInferrer { } } else if SnsRecord::is_match(payload_value) { if let Some(t) = SnsRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); if let Some(message) = &t.sns.message { if let Ok(event_bridge_wrapper_message) = @@ -150,7 +148,8 @@ impl SpanInferrer { ..Default::default() }; - event_bridge_wrapper_message.enrich_span(&mut wrapped_inferred_span); + event_bridge_wrapper_message + .enrich_span(&mut wrapped_inferred_span, &self.service_mapping); inferred_span .meta .extend(event_bridge_wrapper_message.get_tags()); @@ -166,25 +165,25 @@ impl SpanInferrer { } } else if DynamoDbRecord::is_match(payload_value) { if let Some(t) = DynamoDbRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if S3Record::is_match(payload_value) { if let Some(t) = S3Record::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if EventBridgeEvent::is_match(payload_value) { if let Some(t) = EventBridgeEvent::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } } else if KinesisRecord::is_match(payload_value) { if let Some(t) = KinesisRecord::new(payload_value.clone()) { - t.enrich_span(&mut inferred_span); + t.enrich_span(&mut inferred_span, &self.service_mapping); trigger = Some(Box::new(t)); } @@ -240,7 +239,6 @@ impl SpanInferrer { } // TODO: add status tag and other info from response - // TODO: add peer.service pub fn complete_inferred_spans(&mut self, invocation_span: &Span) { if let Some(s) = &mut self.inferred_span { if let Some(ws) = &mut self.wrapped_inferred_span { @@ -262,6 +260,8 @@ impl SpanInferrer { // Set error ws.error = invocation_span.error; + ws.meta + .insert(String::from("peer.service"), s.service.clone()); ws.trace_id = invocation_span.trace_id; } @@ -279,6 +279,10 @@ impl SpanInferrer { // Set error s.error = invocation_span.error; + s.meta.insert( + String::from("peer.service"), + invocation_span.service.clone(), + ); s.trace_id = invocation_span.trace_id; } diff --git a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs index db8077257..cdf372001 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_http_event.rs @@ -7,7 +7,8 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ - get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + get_aws_partition_by_region, lowercase_key, ServiceNameResolver, Trigger, + FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -63,7 +64,7 @@ impl Trigger for APIGatewayHttpEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an API Gateway HTTP Event"); let resource = if self.route_key.is_empty() { format!( @@ -81,8 +82,9 @@ impl Trigger for APIGatewayHttpEvent { path = self.request_context.http.path ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = "aws.httpapi".to_string(); span.service = service_name; @@ -191,6 +193,15 @@ impl Trigger for APIGatewayHttpEvent { } } +impl ServiceNameResolver for APIGatewayHttpEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_api_gateway" + } +} #[cfg(test)] mod tests { use super::*; @@ -268,7 +279,8 @@ mod tests { let event = APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.httpapi"); assert_eq!( span.service, @@ -331,7 +343,8 @@ mod tests { let event = APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.httpapi"); assert_eq!( span.service, @@ -393,4 +406,39 @@ mod tests { "arn:aws:apigateway:sa-east-1::/restapis/x02yirxc7a/stages/$default" ); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("api_gateway_http_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + APIGatewayHttpEvent::new(payload).expect("Failed to deserialize APIGatewayHttpEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("x02yirxc7a".to_string(), "specific-service".to_string()), + ( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name( + &specific_service_mapping, + &event.request_context.domain_name + ), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event + .resolve_service_name(&generic_service_mapping, &event.request_context.domain_name), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs index e8fc443dd..67a1180be 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/api_gateway_rest_event.rs @@ -7,7 +7,8 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ - get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + get_aws_partition_by_region, lowercase_key, ServiceNameResolver, Trigger, + FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -66,7 +67,7 @@ impl Trigger for APIGatewayRestEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an API Gateway REST Event"); let resource = format!( "{http_method} {path}", @@ -79,8 +80,9 @@ impl Trigger for APIGatewayRestEvent { path = self.request_context.path ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = "aws.apigateway".to_string(); span.service = service_name; @@ -179,6 +181,16 @@ impl Trigger for APIGatewayRestEvent { } } +impl ServiceNameResolver for APIGatewayRestEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_api_gateway" + } +} + #[cfg(test)] mod tests { use super::*; @@ -240,7 +252,8 @@ mod tests { let event = APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.apigateway"); assert_eq!(span.service, "id.execute-api.us-east-1.amazonaws.com"); assert_eq!(span.resource, "GET /path"); @@ -298,7 +311,8 @@ mod tests { let event = APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.apigateway"); assert_eq!( span.service, @@ -368,4 +382,39 @@ mod tests { "arn:aws:apigateway:us-east-1::/restapis/id/stages/$default" ); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("api_gateway_rest_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + APIGatewayRestEvent::new(payload).expect("Failed to deserialize APIGatewayRestEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("id".to_string(), "specific-service".to_string()), + ( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name( + &specific_service_mapping, + &event.request_context.domain_name + ), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_api_gateway".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event + .resolve_service_name(&generic_service_mapping, &event.request_context.domain_name), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs index 026e74832..8503f46c5 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs @@ -6,7 +6,7 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::S_TO_NS, - triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -74,14 +74,14 @@ impl Trigger for DynamoDbRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for a DynamoDB event"); - let table_name = self.event_source_arn.split('/').nth(1).unwrap_or_default(); + let table_name = self.get_specific_identifier(); let resource = format!("{} {}", self.event_name.clone(), table_name); let start_time = (self.dynamodb.approximate_creation_date_time * S_TO_NS) as i64; - // todo: service mapping and peer service - let service_name = "dynamodb"; + + let service_name = self.resolve_service_name(service_mapping, "dynamodb"); span.name = String::from("aws.dynamodb"); span.service = service_name.to_string(); @@ -129,6 +129,20 @@ impl Trigger for DynamoDbRecord { } } +impl ServiceNameResolver for DynamoDbRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split('/') + .nth(1) + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_dynamodb" + } +} + #[cfg(test)] mod tests { use super::*; @@ -176,7 +190,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.dynamodb"); assert_eq!(span.service, "dynamodb"); assert_eq!(span.resource, "INSERT ExampleTableWithStream"); @@ -237,4 +252,32 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ( + "ExampleTableWithStream".to_string(), + "specific-service".to_string(), + ), + ("lambda_dynamodb".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "dynamodb"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_dynamodb".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "dynamodb"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs index ff7d174c6..f9b1e17b1 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs @@ -7,7 +7,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::{MS_TO_NS, S_TO_NS}, - triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, }; const DATADOG_START_TIME_KEY: &str = "x-datadog-start-time"; @@ -49,7 +51,7 @@ impl Trigger for EventBridgeEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { // EventBridge events have a timestamp resolution in seconds let start_time_seconds = self .time @@ -57,17 +59,13 @@ impl Trigger for EventBridgeEvent { .unwrap_or((self.time.timestamp_millis() as f64 * S_TO_NS) as i64); let carrier = self.get_carrier(); - let resource_name = carrier - .get(DATADOG_RESOURCE_NAME_KEY) - .unwrap_or(&self.source) - .clone(); + let resource_name = self.get_specific_identifier(); let start_time = carrier .get(DATADOG_START_TIME_KEY) .and_then(|s| s.parse::().ok()) .map_or(start_time_seconds, |s| (s * MS_TO_NS) as i64); - // todo: service mapping and peer service - let service_name = "eventbridge"; + let service_name = self.resolve_service_name(service_mapping, "eventbridge"); span.name = String::from("aws.eventbridge"); span.service = service_name.to_string(); @@ -105,6 +103,20 @@ impl Trigger for EventBridgeEvent { } } +impl ServiceNameResolver for EventBridgeEvent { + fn get_specific_identifier(&self) -> String { + let carrier = self.get_carrier(); + carrier + .get(DATADOG_RESOURCE_NAME_KEY) + .unwrap_or(&self.source) + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_eventbridge" + } +} + #[cfg(test)] mod tests { use super::*; @@ -168,7 +180,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); let expected = serde_json::from_str(&read_json_file("eventbridge_span.json")) .expect("Failed to deserialize into Span"); @@ -183,7 +196,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.resource, "my.event"); } @@ -196,7 +210,8 @@ mod tests { EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.resource, "testBus"); // Seconds resolution @@ -239,4 +254,34 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("eventbridge_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = EventBridgeEvent::new(payload).expect("Failed to deserialize EventBridgeEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("testBus".to_string(), "specific-service".to_string()), + ( + "lambda_eventbridge".to_string(), + "generic-service".to_string(), + ), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "eventbridge"), + "specific-service" + ); + + let generic_service_mapping = HashMap::from([( + "lambda_eventbridge".to_string(), + "generic-service".to_string(), + )]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "eventbridge"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs index c735d5439..ae55add0c 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs @@ -9,7 +9,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::S_TO_NS, - triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -69,20 +71,24 @@ impl Trigger for KinesisRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { - let event_source_arn = &self.event_source_arn; - let parsed_stream_name = event_source_arn.split('/').last().unwrap_or_default(); - let parsed_shard_id = self.event_id.split(':').next().unwrap_or_default(); - span.name = "aws.kinesis".to_string(); - span.service = "kinesis".to_string(); + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { + let stream_name = self.get_specific_identifier(); + let shard_id = self.event_id.split(':').next().unwrap_or_default(); + let service_name = self.resolve_service_name(service_mapping, "kinesis"); + + span.name = String::from("aws.kinesis"); + span.service = service_name; span.start = (self.kinesis.approximate_arrival_timestamp * S_TO_NS) as i64; - span.resource = parsed_stream_name.to_string(); + span.resource.clone_from(&stream_name); span.r#type = "web".to_string(); span.meta = HashMap::from([ ("operation_name".to_string(), "aws.kinesis".to_string()), - ("stream_name".to_string(), parsed_stream_name.to_string()), - ("shard_id".to_string(), parsed_shard_id.to_string()), - ("event_source_arn".to_string(), event_source_arn.to_string()), + ("stream_name".to_string(), stream_name.to_string()), + ("shard_id".to_string(), shard_id.to_string()), + ( + "event_source_arn".to_string(), + self.event_source_arn.to_string(), + ), ("event_id".to_string(), self.event_id.to_string()), ("event_name".to_string(), self.event_name.to_string()), ("event_version".to_string(), self.event_version.to_string()), @@ -120,6 +126,20 @@ impl Trigger for KinesisRecord { } } +impl ServiceNameResolver for KinesisRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split('/') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_kinesis" + } +} + #[cfg(test)] mod tests { use super::*; @@ -170,7 +190,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = KinesisRecord::new(payload).expect("Failed to deserialize S3Record"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.kinesis"); assert_eq!(span.service, "kinesis"); assert_eq!(span.resource, "kinesisStream"); @@ -245,4 +266,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize KinesisRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("kinesisStream".to_string(), "specific-service".to_string()), + ("lambda_kinesis".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "kinesis"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_kinesis".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "kinesis"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs b/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs index 087677a27..14a2eaa32 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/lambda_function_url_event.rs @@ -6,7 +6,7 @@ use serde_json::Value; use crate::lifecycle::invocation::{ processor::MS_TO_NS, - triggers::{lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{lowercase_key, ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -28,6 +28,8 @@ pub struct RequestContext { pub time_epoch: i64, #[serde(rename = "requestId")] pub request_id: String, + #[serde(rename = "apiId")] + pub api_id: String, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -61,7 +63,7 @@ impl Trigger for LambdaFunctionUrlEvent { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { let resource = format!( "{} {}", self.request_context.http.method, self.request_context.http.path @@ -74,8 +76,9 @@ impl Trigger for LambdaFunctionUrlEvent { ); let start_time = (self.request_context.time_epoch as f64 * MS_TO_NS) as i64; - // todo: service mapping and peer service - let service_name = self.request_context.domain_name.clone(); + + let service_name = + self.resolve_service_name(service_mapping, &self.request_context.domain_name); span.name = String::from("aws.lambda.url"); span.service = service_name; @@ -168,6 +171,16 @@ impl Trigger for LambdaFunctionUrlEvent { } } +impl ServiceNameResolver for LambdaFunctionUrlEvent { + fn get_specific_identifier(&self) -> String { + self.request_context.api_id.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_url" + } +} + #[cfg(test)] mod tests { use super::*; @@ -222,6 +235,7 @@ mod tests { }, account_id: String::from("601427279990"), domain_name: String::from("a8hyhsshac.lambda-url.eu-south-1.amazonaws.com"), + api_id: String::from("a8hyhsshac"), }, }; @@ -252,7 +266,8 @@ mod tests { let event = LambdaFunctionUrlEvent::new(payload) .expect("Failed to deserialize LambdaFunctionUrlEvent"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.lambda.url"); assert_eq!( span.service, @@ -306,4 +321,30 @@ mod tests { ); env::remove_var("AWS_LAMBDA_FUNCTION_NAME"); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("lambda_function_url_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = LambdaFunctionUrlEvent::new(payload) + .expect("Failed to deserialize LambdaFunctionUrlEvent"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("a8hyhsshac".to_string(), "specific-service".to_string()), + ("lambda_url".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "domain-name"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_url".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "domain-name"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index 6704a459d..2f9a0100a 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -19,18 +19,42 @@ pub const DATADOG_CARRIER_KEY: &str = "_datadog"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn"; -pub trait Trigger { +/// Resolves the service name for a given trigger depending on +/// service mapping configuration. +pub trait ServiceNameResolver { + /// Get the specific service name for this trigger type, it will + /// be used as a key to resolve the service name + fn get_specific_identifier(&self) -> String; + + /// Get the generic service mapping key for the trigger + fn get_generic_identifier(&self) -> &'static str; +} + +pub trait Trigger: ServiceNameResolver { fn new(payload: Value) -> Option where Self: Sized; fn is_match(payload: &Value) -> bool where Self: Sized; - fn enrich_span(&self, span: &mut Span); + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap); fn get_tags(&self) -> HashMap; fn get_arn(&self, region: &str) -> String; fn get_carrier(&self) -> HashMap; fn is_async(&self) -> bool; + + /// Default implementation for service name resolution + fn resolve_service_name( + &self, + service_mapping: &HashMap, + fallback: &str, + ) -> String { + service_mapping + .get(&self.get_specific_identifier()) + .or_else(|| service_mapping.get(self.get_generic_identifier())) + .unwrap_or(&fallback.to_string()) + .to_string() + } } #[must_use] diff --git a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs index 1e7fe5beb..d45dc1f50 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/s3_event.rs @@ -8,7 +8,7 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, - triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -77,15 +77,15 @@ impl Trigger for S3Record { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an InferredSpan span with S3 event"); - let bucket_name = self.s3.bucket.name.clone(); + let bucket_name = self.get_specific_identifier(); let start_time = self .event_time .timestamp_nanos_opt() .unwrap_or((self.event_time.timestamp_millis() as f64 * MS_TO_NS) as i64); - // todo: service mapping - let service_name = "s3"; + + let service_name = self.resolve_service_name(service_mapping, "s3"); span.name = String::from("aws.s3"); span.service = service_name.to_string(); @@ -123,6 +123,16 @@ impl Trigger for S3Record { } } +impl ServiceNameResolver for S3Record { + fn get_specific_identifier(&self) -> String { + self.s3.bucket.name.clone() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_s3" + } +} + #[cfg(test)] mod tests { use super::*; @@ -177,7 +187,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = S3Record::new(payload).expect("Failed to deserialize S3Record"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.s3"); assert_eq!(span.service, "s3"); assert_eq!(span.resource, "example-bucket"); @@ -237,4 +248,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("s3_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = S3Record::new(payload).expect("Failed to deserialize S3Record"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("example-bucket".to_string(), "specific-service".to_string()), + ("lambda_s3".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "s3"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_s3".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "s3"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs index 2b7514cf1..47091a9d6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; +use datadog_trace_protobuf::pb::Span; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::debug; @@ -9,7 +10,7 @@ use crate::lifecycle::invocation::{ base64_to_string, processor::MS_TO_NS, triggers::{ - event_bridge_event::EventBridgeEvent, Trigger, DATADOG_CARRIER_KEY, + event_bridge_event::EventBridgeEvent, ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -82,33 +83,26 @@ impl Trigger for SnsRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut datadog_trace_protobuf::pb::Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an SNS Event"); - let resource = self - .sns - .topic_arn - .clone() - .split(':') - .last() - .unwrap_or_default() - .to_string(); + let resource_name = self.get_specific_identifier(); let start_time = self .sns .timestamp .timestamp_nanos_opt() .unwrap_or((self.sns.timestamp.timestamp_millis() as f64 * MS_TO_NS) as i64); - // todo: service mapping - let service_name = "sns".to_string(); + + let service_name = self.resolve_service_name(service_mapping, "sns"); span.name = "aws.sns".to_string(); span.service = service_name.to_string(); - span.resource.clone_from(&resource); + span.resource.clone_from(&resource_name); span.r#type = "web".to_string(); span.start = start_time; span.meta.extend([ ("operation_name".to_string(), "aws.sns".to_string()), - ("topicname".to_string(), resource), + ("topicname".to_string(), resource_name), ("topic_arn".to_string(), self.sns.topic_arn.clone()), ("message_id".to_string(), self.sns.message_id.clone()), ("type".to_string(), self.sns.r#type.clone()), @@ -164,6 +158,21 @@ impl Trigger for SnsRecord { } } +impl ServiceNameResolver for SnsRecord { + fn get_specific_identifier(&self) -> String { + self.sns + .topic_arn + .split(':') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_sns" + } +} + #[cfg(test)] mod tests { use datadog_trace_protobuf::pb::Span; @@ -224,7 +233,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.sns"); assert_eq!(span.service, "sns"); assert_eq!(span.resource, "serverlessTracingTopicPy"); @@ -341,4 +351,32 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ( + "serverlessTracingTopicPy".to_string(), + "specific-service".to_string(), + ), + ("lambda_sns".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "sns"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_sns".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "sns"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs index 6e748d4d9..c9766a736 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs @@ -10,7 +10,7 @@ use crate::lifecycle::invocation::{ event_bridge_event::EventBridgeEvent, get_aws_partition_by_region, sns_event::{SnsEntity, SnsRecord}, - Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -98,23 +98,17 @@ impl Trigger for SqsRecord { } #[allow(clippy::cast_possible_truncation)] - fn enrich_span(&self, span: &mut Span) { + fn enrich_span(&self, span: &mut Span, service_mapping: &HashMap) { debug!("Enriching an Inferred Span for an SQS Event"); - let resource = self - .event_source_arn - .clone() - .split(':') - .last() - .unwrap_or_default() - .to_string(); + let resource = self.get_specific_identifier(); let start_time = (self .attributes .sent_timestamp .parse::() .unwrap_or_default() as f64 * MS_TO_NS) as i64; - // todo: service mapping - let service_name = "sqs"; + + let service_name = self.resolve_service_name(service_mapping, "sqs"); span.name = "aws.sqs".to_string(); span.service = service_name.to_string(); @@ -198,6 +192,20 @@ impl Trigger for SqsRecord { } } +impl ServiceNameResolver for SqsRecord { + fn get_specific_identifier(&self) -> String { + self.event_source_arn + .split(':') + .last() + .unwrap_or_default() + .to_string() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_sqs" + } +} + #[cfg(test)] mod tests { use super::*; @@ -260,7 +268,8 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord"); let mut span = Span::default(); - event.enrich_span(&mut span); + let service_mapping = HashMap::new(); + event.enrich_span(&mut span, &service_mapping); assert_eq!(span.name, "aws.sqs"); assert_eq!(span.service, "sqs"); assert_eq!(span.resource, "MyQueue"); @@ -391,4 +400,29 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_resolve_service_name() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord"); + + // Priority is given to the specific key + let specific_service_mapping = HashMap::from([ + ("MyQueue".to_string(), "specific-service".to_string()), + ("lambda_sqs".to_string(), "generic-service".to_string()), + ]); + + assert_eq!( + event.resolve_service_name(&specific_service_mapping, "sqs"), + "specific-service" + ); + + let generic_service_mapping = + HashMap::from([("lambda_sqs".to_string(), "generic-service".to_string())]); + assert_eq!( + event.resolve_service_name(&generic_service_mapping, "sqs"), + "generic-service" + ); + } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs index 91eb2af54..ee77434bc 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs @@ -5,7 +5,9 @@ use serde_json::Value; use sha2::{Digest, Sha256}; use crate::{ - lifecycle::invocation::triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + lifecycle::invocation::triggers::{ + ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + }, traces::{ context::{Sampling, SpanContext}, propagation::text_map_propagator::DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, @@ -82,7 +84,12 @@ impl Trigger for StepFunctionEvent { execution_id.is_some() && name.is_some() && entered_time.is_some() } - fn enrich_span(&self, _span: &mut datadog_trace_protobuf::pb::Span) {} + fn enrich_span( + &self, + _span: &mut datadog_trace_protobuf::pb::Span, + _service_mapping: &HashMap, + ) { + } fn get_tags(&self) -> HashMap { HashMap::from([( @@ -182,6 +189,16 @@ impl StepFunctionEvent { } } +impl ServiceNameResolver for StepFunctionEvent { + fn get_specific_identifier(&self) -> String { + String::new() + } + + fn get_generic_identifier(&self) -> &'static str { + "lambda_stepfunction" + } +} + #[cfg(test)] mod tests { use super::*;