diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index d6679bc2f..e37789202 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -149,16 +149,22 @@ impl Processor { self.span.meta.extend(trigger_tags); } - self.inferrer.complete_inferred_span(&self.span); + self.inferrer.complete_inferred_spans(&self.span); if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); let mut traces = vec![self.span.clone()]; + if let Some(inferred_span) = &self.inferrer.inferred_span { body_size += std::mem::size_of_val(inferred_span); traces.push(inferred_span.clone()); } + if let Some(ws) = &self.inferrer.wrapped_inferred_span { + body_size += std::mem::size_of_val(ws); + traces.push(ws.clone()); + } + // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index bc6ac7eac..6c84cbe70 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -8,13 +8,17 @@ use tracing::debug; use crate::config::AwsConfig; use crate::lifecycle::invocation::triggers::{ - api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent, - sns_event::SnsRecord, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, + api_gateway_http_event::APIGatewayHttpEvent, + api_gateway_rest_event::APIGatewayRestEvent, + sns_event::{SnsEntity, SnsRecord}, + sqs_event::SqsRecord, + Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, }; use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; pub struct SpanInferrer { pub inferred_span: Option, + pub wrapped_inferred_span: Option, is_async_span: bool, carrier: Option>, trigger_tags: Option>, @@ -31,6 +35,7 @@ impl SpanInferrer { pub fn new() -> Self { Self { inferred_span: None, + wrapped_inferred_span: None, is_async_span: false, carrier: None, trigger_tags: None, @@ -43,84 +48,79 @@ impl SpanInferrer { /// pub fn infer_span(&mut self, payload_value: &Value, aws_config: &AwsConfig) { self.inferred_span = None; + self.wrapped_inferred_span = None; self.is_async_span = false; self.carrier = None; self.trigger_tags = None; + let mut trigger: Option> = None; + let mut inferred_span = Span { + span_id: Self::generate_span_id(), + ..Default::default() + }; + if APIGatewayHttpEvent::is_match(payload_value) { if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) { - let mut span = Span { - span_id: Self::generate_span_id(), - ..Default::default() - }; - - t.enrich_span(&mut span); - let mut tt = t.get_tags(); - tt.extend([( - FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), - t.get_arn(&aws_config.region), - )]); - - self.carrier = Some(t.get_carrier()); - self.trigger_tags = Some(tt); - self.is_async_span = t.is_async(); - self.inferred_span = Some(span); + t.enrich_span(&mut inferred_span); + + trigger = Some(Box::new(t)); } } else if APIGatewayRestEvent::is_match(payload_value) { if let Some(t) = APIGatewayRestEvent::new(payload_value.clone()) { - let mut span = Span { - span_id: Self::generate_span_id(), - ..Default::default() - }; - - t.enrich_span(&mut span); - let mut tt = t.get_tags(); - tt.extend([( - FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), - t.get_arn(&aws_config.region), - )]); - - self.carrier = Some(t.get_carrier()); - self.trigger_tags = Some(tt); - self.is_async_span = t.is_async(); - self.inferred_span = Some(span); + t.enrich_span(&mut inferred_span); + + trigger = Some(Box::new(t)); } } else if SqsRecord::is_match(payload_value) { if let Some(t) = SqsRecord::new(payload_value.clone()) { - let mut span = Span { - span_id: Self::generate_span_id(), - ..Default::default() - }; - - t.enrich_span(&mut span); - let mut tt = t.get_tags(); - tt.extend([( - FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), - t.get_arn(&aws_config.region), - )]); - - self.carrier = Some(t.get_carrier()); - self.trigger_tags = Some(tt); - self.is_async_span = t.is_async(); - self.inferred_span = Some(span); + t.enrich_span(&mut inferred_span); + + // Check for SNS event wrapped in the SQS body + if let Ok(sns_entity) = serde_json::from_str::(&t.body) { + debug!("Found an SNS event wrapped in the SQS body"); + let mut wrapped_inferred_span = Span { + span_id: Self::generate_span_id(), + ..Default::default() + }; + + let wt = SnsRecord { + sns: sns_entity, + event_subscription_arn: None, + }; + wt.enrich_span(&mut wrapped_inferred_span); + inferred_span.meta.extend(wt.get_tags()); + + wrapped_inferred_span.duration = + inferred_span.start - wrapped_inferred_span.start; + + self.wrapped_inferred_span = Some(wrapped_inferred_span); + } + + trigger = Some(Box::new(t)); } } else if SnsRecord::is_match(payload_value) { if let Some(t) = SnsRecord::new(payload_value.clone()) { - let mut span = Span { - span_id: Self::generate_span_id(), - ..Default::default() - }; - - t.enrich_span(&mut span); + t.enrich_span(&mut inferred_span); - self.carrier = Some(t.get_carrier()); - self.trigger_tags = Some(t.get_tags()); - self.is_async_span = t.is_async(); - self.inferred_span = Some(span); + trigger = Some(Box::new(t)); } } else { debug!("Unable to infer span from payload: no matching trigger found"); } + + // Inferred a trigger + if let Some(t) = trigger { + let mut trigger_tags = t.get_tags(); + trigger_tags.extend([( + FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), + t.get_arn(&aws_config.region), + )]); + + self.trigger_tags = Some(trigger_tags); + self.carrier = Some(t.get_carrier()); + self.is_async_span = t.is_async(); + self.inferred_span = Some(inferred_span); + } } /// If a `self.inferred_span` exist, set the `parent_id` to @@ -144,9 +144,30 @@ impl SpanInferrer { } } - // TODO add status tag and other info from response - pub fn complete_inferred_span(&mut self, invocation_span: &Span) { + // TODO: add status tag and other info from response + // TODO: add peer.service + pub fn complete_inferred_spans(&mut self, invocation_span: &Span) { if let Some(s) = &mut self.inferred_span { + if let Some(ws) = &mut self.wrapped_inferred_span { + // Set correct Parent ID for multiple inferred spans + ws.parent_id = s.parent_id; + s.parent_id = ws.span_id; + + // TODO: clean this logic + if self.is_async_span { + // SNS to SQS span duration will be set + if ws.duration == 0 { + let duration = s.start - ws.start; + ws.duration = duration; + } + } else { + let duration = s.start - ws.start; + ws.duration = duration; + } + + ws.trace_id = invocation_span.trace_id; + } + if self.is_async_span { // SNS to SQS span duration will be set if s.duration == 0 { diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index 863fb4019..6edbe5c07 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -14,9 +14,13 @@ pub const DATADOG_CARRIER_KEY: &str = "_datadog"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn"; -pub trait Trigger: Sized { - fn new(payload: Value) -> Option; - fn is_match(payload: &Value) -> bool; +pub trait Trigger { + fn new(payload: Value) -> Option + where + Self: Sized; + fn is_match(payload: &Value) -> bool + where + Self: Sized; fn enrich_span(&self, span: &mut Span); fn get_tags(&self) -> HashMap; fn get_arn(&self, region: &str) -> String; diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs index 443a6ada9..cbf313ff6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -7,11 +7,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, - triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY}, + triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; -use super::{FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}; - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct SnsEvent { #[serde(rename = "Records")] @@ -123,16 +121,10 @@ impl Trigger for SnsRecord { } fn get_tags(&self) -> HashMap { - HashMap::from([ - ( - FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), - "sns".to_string(), - ), - ( - FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), - self.sns.topic_arn.clone(), - ), - ]) + HashMap::from([( + FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), + "sns".to_string(), + )]) } fn get_arn(&self, _region: &str) -> String { @@ -246,16 +238,10 @@ mod tests { let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); let tags = event.get_tags(); - let expected = HashMap::from([ - ( - "function_trigger.event_source".to_string(), - "sns".to_string(), - ), - ( - "function_trigger.event_source_arn".to_string(), - "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string(), - ), - ]); + let expected = HashMap::from([( + "function_trigger.event_source".to_string(), + "sns".to_string(), + )]); assert_eq!(tags, expected); } diff --git a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs index 47c4a6069..4f804c3de 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs @@ -7,8 +7,9 @@ use tracing::debug; use crate::lifecycle::invocation::{ processor::MS_TO_NS, triggers::{ - get_aws_partition_by_region, Trigger, DATADOG_CARRIER_KEY, - FUNCTION_TRIGGER_EVENT_SOURCE_TAG, + get_aws_partition_by_region, + sns_event::{SnsEntity, SnsRecord}, + Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG, }, }; @@ -35,6 +36,7 @@ pub struct SqsRecord { pub event_source_arn: String, #[serde(rename = "awsRegion")] pub aws_region: String, + pub body: String, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -178,8 +180,19 @@ impl Trigger for SqsRecord { } } - // TODO: AWSTraceHeader + // TODO: Check for EventBridge event sent through SQS + + // Check for SNS event sent through SQS + if let Ok(sns_entity) = serde_json::from_str::(&self.body) { + let sns_record = SnsRecord { + sns: sns_entity, + event_subscription_arn: None, + }; + + return sns_record.get_carrier(); + } + // TODO: AWSTraceHeader carrier } } @@ -219,6 +232,7 @@ mod tests { event_source: "aws:sqs".to_string(), event_source_arn: "arn:aws:sqs:us-east-1:123456789012:MyQueue".to_string(), aws_region: "us-east-1".to_string(), + body: "Hello from SQS!".to_string(), }; assert_eq!(result, expected); @@ -326,4 +340,26 @@ mod tests { assert_eq!(carrier, expected); } + + #[test] + fn test_get_carrier_from_sns() { + let json = read_json_file("sns_sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord"); + let carrier = event.get_carrier(); + + let expected = HashMap::from([ + ( + "x-datadog-trace-id".to_string(), + "2776434475358637757".to_string(), + ), + ( + "x-datadog-parent-id".to_string(), + "4493917105238181843".to_string(), + ), + ("x-datadog-sampling-priority".to_string(), "1".to_string()), + ]); + + assert_eq!(carrier, expected); + } } diff --git a/bottlecap/tests/payloads/sns_sqs_event.json b/bottlecap/tests/payloads/sns_sqs_event.json new file mode 100644 index 000000000..c1746d8fb --- /dev/null +++ b/bottlecap/tests/payloads/sns_sqs_event.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "messageId": "64812b68-4d9b-4dca-b3fb-9b18f255ee51", + "receiptHandle": "AQEBER6aRkfG8092GvkL7FRwCwbQ7LLDW9Tlk/CembqHe+suS2kfFxXiukomvaIN61QoyQMoRgWuV52SDkiQno2u+5hP64BDbmw+e/KR9ayvIfHJ3M6RfyQLaWNWm3hDFBCKTnBMVIxtdx0N9epZZewyokjKcrNYtmCghFgTCvZzsQkowi5rnoHAVHJ3je1c3bDnQ1KLrZFgajDnootYXDwEPuMq5FIxrf4EzTe0S7S+rnRm+GaQfeBLBVAY6dASL9usV3/AFRqDtaI7GKI+0F2NCgLlqj49VlPRz4ldhkGknYlKTZTluAqALWLJS62/J1GQo53Cs3nneJcmu5ajB2zzmhhRXoXINEkLhCD5ujZfcsw9H4xqW69Or4ECvlqx14bUU2rtMIW0QM2p7pEeXnyocymQv6m1te113eYWTVmaJ4I=", + "body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"0a0ab23e-4861-5447-82b7-e8094ff3e332\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA\",\n \"Message\" : \"{\\\"hello\\\":\\\"harv\\\",\\\"nice of you to join us\\\":\\\"david\\\",\\\"anotherThing\\\":{\\\"foo\\\":\\\"bar\\\",\\\"blah\\\":null,\\\"harv\\\":123},\\\"vals\\\":[{\\\"thingOne\\\":1},{\\\"thingTwo\\\":2}],\\\"ajTimestamp\\\":1639777617957}\",\n \"Timestamp\" : \"2021-12-17T21:46:58.040Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==\",\n \"SigningCertUrl\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem\",\n \"UnsubscribeUrl\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda\",\n \"MessageAttributes\" : {\n \"_datadog\" : {\"Type\":\"String\",\"Value\":\"{\\\"x-datadog-trace-id\\\":\\\"2776434475358637757\\\",\\\"x-datadog-parent-id\\\":\\\"4493917105238181843\\\",\\\"x-datadog-sampling-priority\\\":\\\"1\\\"}\"}\n }\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1639777618130", + "SenderId": "AIDAIOA2GYWSHW4E2VXIO", + "ApproximateFirstReceiveTimestamp": "1639777618132" + }, + "messageAttributes": {}, + "md5OfBody": "ee19d8b1377919239ad3fd5dabc33739", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-west-1:601427279990:aj-js-library-test-dev-demo-queue", + "awsRegion": "eu-west-1" + } + ] +}