diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 4ad90dffa..bc6ac7eac 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -9,7 +9,7 @@ use crate::config::AwsConfig; use crate::lifecycle::invocation::triggers::{ api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent, - sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, + sns_event::SnsRecord, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, }; use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; @@ -104,6 +104,20 @@ impl SpanInferrer { self.is_async_span = t.is_async(); self.inferred_span = Some(span); } + } else if SnsRecord::is_match(payload_value) { + if let Some(t) = SnsRecord::new(payload_value.clone()) { + let mut span = Span { + span_id: Self::generate_span_id(), + ..Default::default() + }; + + t.enrich_span(&mut span); + + self.carrier = Some(t.get_carrier()); + self.trigger_tags = Some(t.get_tags()); + self.is_async_span = t.is_async(); + self.inferred_span = Some(span); + } } else { debug!("Unable to infer span from payload: no matching trigger found"); } diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index a989ce009..eed14a00d 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -1,11 +1,13 @@ use std::{collections::HashMap, hash::BuildHasher}; +use base64::{engine::general_purpose, Engine}; use datadog_trace_protobuf::pb::Span; use serde::{ser::SerializeMap, Serializer}; use serde_json::Value; pub mod api_gateway_http_event; pub mod api_gateway_rest_event; +pub mod sns_event; pub mod sqs_event; pub const DATADOG_CARRIER_KEY: &str = "_datadog"; @@ -31,6 +33,14 @@ pub fn get_aws_partition_by_region(region: &str) -> String { } } +#[must_use] +pub fn base64_to_string(base64_string: &str) -> String { + let bytes = general_purpose::STANDARD + .decode(base64_string) + .unwrap_or_default(); + String::from_utf8_lossy(&bytes).to_string() +} + /// Serialize a `HashMap` with lowercase keys /// pub fn lowercase_key( diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs new file mode 100644 index 000000000..443a6ada9 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -0,0 +1,317 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tracing::debug; + +use crate::lifecycle::invocation::{ + processor::MS_TO_NS, + triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY}, +}; + +use super::{FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct SnsEvent { + #[serde(rename = "Records")] + pub records: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct SnsRecord { + #[serde(rename = "Sns")] + pub sns: SnsEntity, + #[serde(rename = "EventSubscriptionArn")] + pub event_subscription_arn: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct SnsEntity { + #[serde(rename = "MessageId")] + pub message_id: String, + #[serde(rename = "Type")] + pub r#type: String, + #[serde(rename = "TopicArn")] + pub topic_arn: String, + #[serde(rename = "MessageAttributes")] + pub message_attributes: HashMap, + #[serde(rename = "Timestamp")] + pub timestamp: DateTime, + #[serde(rename = "Subject")] + pub subject: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct MessageAttribute { + #[serde(rename = "Type")] + pub r#type: String, + #[serde(rename = "Value")] + pub value: String, +} + +impl Trigger for SnsRecord { + fn new(payload: serde_json::Value) -> Option { + match payload.get("Records").and_then(Value::as_array) { + Some(records) => match serde_json::from_value::(records[0].clone()) { + Ok(record) => Some(record), + Err(e) => { + debug!("Failed to deserialize SNS Record: {e}"); + None + } + }, + None => None, + } + } + + fn is_match(payload: &serde_json::Value) -> bool { + if let Some(first_record) = payload + .get("Records") + .and_then(Value::as_array) + .and_then(|r| r.first()) + .take() + { + return first_record.get("Sns").is_some(); + } + + false + } + + #[allow(clippy::cast_possible_truncation)] + fn enrich_span(&self, span: &mut datadog_trace_protobuf::pb::Span) { + debug!("Enriching an Inferred Span for an SNS Event"); + let resource = self + .sns + .topic_arn + .clone() + .split(':') + .last() + .unwrap_or_default() + .to_string(); + + let start_time = self + .sns + .timestamp + .timestamp_nanos_opt() + .unwrap_or((self.sns.timestamp.timestamp_millis() as f64 * MS_TO_NS) as i64); + // todo: service mapping + let service_name = "sns".to_string(); + + span.name = "aws.sns".to_string(); + span.service = service_name.to_string(); + span.resource.clone_from(&resource); + span.r#type = "web".to_string(); + span.start = start_time; + span.meta.extend([ + ("operation_name".to_string(), "aws.sns".to_string()), + ("topicname".to_string(), resource), + ("topic_arn".to_string(), self.sns.topic_arn.clone()), + ("message_id".to_string(), self.sns.message_id.clone()), + ("type".to_string(), self.sns.r#type.clone()), + ]); + + if let Some(subject) = &self.sns.subject { + span.meta.insert("subject".to_string(), subject.clone()); + } + + if let Some(event_subscription_arn) = &self.event_subscription_arn { + span.meta.insert( + "event_subscription_arn".to_string(), + event_subscription_arn.clone(), + ); + } + } + + fn get_tags(&self) -> HashMap { + HashMap::from([ + ( + FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), + "sns".to_string(), + ), + ( + FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(), + self.sns.topic_arn.clone(), + ), + ]) + } + + fn get_arn(&self, _region: &str) -> String { + self.sns.topic_arn.clone() + } + + fn get_carrier(&self) -> HashMap { + let carrier = HashMap::new(); + if let Some(ma) = self.sns.message_attributes.get(DATADOG_CARRIER_KEY) { + match ma.r#type.as_str() { + "String" => return serde_json::from_str(&ma.value).unwrap_or_default(), + "Binary" => { + return serde_json::from_str(&base64_to_string(&ma.value)).unwrap_or_default() + } + _ => { + debug!("Unsupported type in SNS message attribute"); + } + } + } + + carrier + } + + fn is_async(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use datadog_trace_protobuf::pb::Span; + + use super::*; + use crate::lifecycle::invocation::triggers::test_utils::read_json_file; + + #[test] + fn test_new() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = SnsRecord::new(payload).expect("Failed to deserialize into SnsRecord"); + + let message_attributes = HashMap::::from([ + ("_datadog".to_string(), MessageAttribute { + r#type: "String".to_string(), + value: "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}".to_string(), + }) + ]); + + let expected = SnsRecord { + event_subscription_arn: Some("arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04".to_string()), + sns: SnsEntity { + message_id: "87056a47-f506-5d77-908b-303605d3b197".to_string(), + r#type: "Notification".to_string(), + topic_arn: "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy" + .to_string(), + message_attributes, + timestamp: DateTime::parse_from_rfc3339("2022-01-31T14:13:41.637Z") + .unwrap() + .with_timezone(&Utc), + subject: None, + }, + }; + + assert_eq!(result, expected); + } + + #[test] + fn test_is_match() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize SnsRecord"); + + assert!(SnsRecord::is_match(&payload)); + } + + #[test] + fn test_is_not_match() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize SqsRecord"); + assert!(!SnsRecord::is_match(&payload)); + } + + #[test] + fn test_enrich_span() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + let mut span = Span::default(); + event.enrich_span(&mut span); + assert_eq!(span.name, "aws.sns"); + assert_eq!(span.service, "sns"); + assert_eq!(span.resource, "serverlessTracingTopicPy"); + assert_eq!(span.r#type, "web"); + + assert_eq!( + span.meta, + HashMap::from([ + ("operation_name".to_string(), "aws.sns".to_string()), + ("topicname".to_string(), "serverlessTracingTopicPy".to_string()), + ("topic_arn".to_string(), "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string()), + ("message_id".to_string(), "87056a47-f506-5d77-908b-303605d3b197".to_string()), + ("type".to_string(), "Notification".to_string()), + ("event_subscription_arn".to_string(), "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04".to_string()) + ]) + ); + } + + #[test] + fn test_get_tags() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + let tags = event.get_tags(); + + let expected = HashMap::from([ + ( + "function_trigger.event_source".to_string(), + "sns".to_string(), + ), + ( + "function_trigger.event_source_arn".to_string(), + "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string(), + ), + ]); + + assert_eq!(tags, expected); + } + + #[test] + fn test_get_arn() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + assert_eq!( + event.get_arn("us-east-1"), + "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy" + ); + } + + #[test] + fn test_get_carrier() { + let json = read_json_file("sns_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + let carrier = event.get_carrier(); + + let expected = HashMap::from([ + ( + "x-datadog-trace-id".to_string(), + "4948377316357291421".to_string(), + ), + ( + "x-datadog-parent-id".to_string(), + "6746998015037429512".to_string(), + ), + ("x-datadog-sampling-priority".to_string(), "1".to_string()), + ]); + + assert_eq!(carrier, expected); + } + + #[test] + fn test_get_carrier_from_binary_value() { + let json = read_json_file("sns_event_binary.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord"); + let carrier = event.get_carrier(); + + let expected = HashMap::from([ + ( + "x-datadog-trace-id".to_string(), + "4948377316357291421".to_string(), + ), + ( + "x-datadog-parent-id".to_string(), + "6746998015037429512".to_string(), + ), + ("x-datadog-sampling-priority".to_string(), "1".to_string()), + ]); + + assert_eq!(carrier, expected); + } +} diff --git a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs index 0daed4385..47c4a6069 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs @@ -115,7 +115,7 @@ impl Trigger for SqsRecord { span.name = "aws.sqs".to_string(); span.service = service_name.to_string(); - span.resource.clone_from(&resource); + span.resource = resource; span.r#type = "web".to_string(); span.start = start_time; span.meta.extend(HashMap::from([ diff --git a/bottlecap/tests/payloads/sns_event.json b/bottlecap/tests/payloads/sns_event.json new file mode 100644 index 000000000..ef8062a0e --- /dev/null +++ b/bottlecap/tests/payloads/sns_event.json @@ -0,0 +1,50 @@ +{ + "Records": [ + { + "EventSource": "aws:sns", + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "Sns": { + "Type": "Notification", + "MessageId": "87056a47-f506-5d77-908b-303605d3b197", + "TopicArn": "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy", + "Subject": null, + "Message": "Asynchronously invoking a Lambda function with SNS.", + "Timestamp": "2022-01-31T14:13:41.637Z", + "SignatureVersion": "1", + "Signature": "BmwnJb0Ku2KgQef9QOgaSSTwLyUsbkRq90lzD5Vn4mAcRUOq2ForfMOYbxMB6idljWIWy9t/jK4AIMxPGk/eOGiRcENx3BvAcGcoDayBRFY13+xUGaPn5Lfoht/ZJ7/hmCgFWKRa8ooATZL+AwGAw6Id8qzf0R3M3k2asy5Vxa4ODKiFW9OzWY/zFgsYJhddR3JrQl9YOMRyIobNNHT96o1TwjGsSUTEemrxA6jQtb3QbardEKO+2SuataLEZki7gE2D2sA300WqZecumI339q7la+OIj6VDGDwFoppE2sh8hzJYXAH7oo11giwltE0V3/eLFCVhsE8Y1KD/yDPPsA==", + "SigningCertUrl": "https://sns.sa-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem", + "UnsubscribeUrl": "https://sns.sa-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "MessageAttributes": { + "_datadog": { + "Type": "String", + "Value": "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}" + } + } + } + }, + { + "EventSource": "aws:sns", + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "Sns": { + "Type": "Notification", + "MessageId": "87056a47-f506-5d77-908b-303605d3b197", + "TopicArn": "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy", + "Subject": null, + "Message": "Asynchronously invoking a Lambda function with SNS.", + "Timestamp": "2022-01-31T14:13:41.637Z", + "SignatureVersion": "1", + "Signature": "BmwnJb0Ku2KgQef9QOgaSSTwLyUsbkRq90lzD5Vn4mAcRUOq2ForfMOYbxMB6idljWIWy9t/jK4AIMxPGk/eOGiRcENx3BvAcGcoDayBRFY13+xUGaPn5Lfoht/ZJ7/hmCgFWKRa8ooATZL+AwGAw6Id8qzf0R3M3k2asy5Vxa4ODKiFW9OzWY/zFgsYJhddR3JrQl9YOMRyIobNNHT96o1TwjGsSUTEemrxA6jQtb3QbardEKO+2SuataLEZki7gE2D2sA300WqZecumI339q7la+OIj6VDGDwFoppE2sh8hzJYXAH7oo11giwltE0V3/eLFCVhsE8Y1KD/yDPPsA==", + "SigningCertUrl": "https://sns.sa-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem", + "UnsubscribeUrl": "https://sns.sa-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "MessageAttributes": { + "_datadog": { + "Type": "String", + "Value": "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}" + } + } + } + } + ] +} diff --git a/bottlecap/tests/payloads/sns_event_binary.json b/bottlecap/tests/payloads/sns_event_binary.json new file mode 100644 index 000000000..4a9a2b500 --- /dev/null +++ b/bottlecap/tests/payloads/sns_event_binary.json @@ -0,0 +1,27 @@ +{ + "Records": [ + { + "EventSource": "aws:sns", + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:eu-west-1:601427279990:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "Sns": { + "Type": "Notification", + "MessageId": "87056a47-f506-5d77-908b-303605d3b197", + "TopicArn": "arn:aws:sns:eu-west-1:601427279990:serverlessTracingTopicPy", + "Subject": null, + "Message": "Asynchronously invoking a Lambda function with SNS.", + "Timestamp": "2022-01-31T14:13:41.637Z", + "SignatureVersion": "1", + "Signature": "BmwnJb0Ku2KgQef9QOgaSSTwLyUsbkRq90lzD5Vn4mAcRUOq2ForfMOYbxMB6idljWIWy9t/jK4AIMxPGk/eOGiRcENx3BvAcGcoDayBRFY13+xUGaPn5Lfoht/ZJ7/hmCgFWKRa8ooATZL+AwGAw6Id8qzf0R3M3k2asy5Vxa4ODKiFW9OzWY/zFgsYJhddR3JrQl9YOMRyIobNNHT96o1TwjGsSUTEemrxA6jQtb3QbardEKO+2SuataLEZki7gE2D2sA300WqZecumI339q7la+OIj6VDGDwFoppE2sh8hzJYXAH7oo11giwltE0V3/eLFCVhsE8Y1KD/yDPPsA==", + "SigningCertUrl": "https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem", + "UnsubscribeUrl": "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": "eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI0OTQ4Mzc3MzE2MzU3MjkxNDIxIiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjY3NDY5OTgwMTUwMzc0Mjk1MTIiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIn0=" + } + } + } + } + ] +}