Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6618e05
wip: sqs
astuyve Oct 23, 2024
27c2ddb
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
058059e
feat: sqs tests
astuyve Oct 23, 2024
94b7fc7
invert duration check
duncanista Oct 23, 2024
ea156bc
remove duration set
duncanista Oct 23, 2024
dd13114
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
dad27a8
fmt and add `test_get_arn`
duncanista Oct 23, 2024
3ae55f3
remove unneeded reference
duncanista Oct 23, 2024
cc4c16c
remove unneeded comments
duncanista Oct 23, 2024
9cf0a2a
add `get_carrier` implementation for `SqsRecord`
duncanista Oct 23, 2024
e5ed400
add trace context to `sqs_event.json`
duncanista Oct 23, 2024
dec3bf9
fix: resource_names is not needed
astuyve Oct 24, 2024
7b99cd2
fix: don't deserialize body
astuyve Oct 24, 2024
14a1c08
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 4, 2024
fd2864e
avoid `use super::...`
duncanista Nov 5, 2024
e3d6c3f
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Nov 5, 2024
7a403de
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 5, 2024
b4f1dff
fix unit tests
duncanista Nov 5, 2024
da6c083
set carrier and trigger tags
duncanista Nov 5, 2024
29eaaa5
remove duplicate tag
duncanista Nov 6, 2024
5085bf4
fmt
duncanista Nov 6, 2024
f5fe9ab
pass headers to `on_invocation_end`
duncanista Nov 6, 2024
0cf4dbe
infer first, then extract
duncanista Nov 6, 2024
b136373
reset values on every infer
duncanista Nov 6, 2024
b9edb96
add `sns_event.rs`
duncanista Nov 7, 2024
8232e6a
add `sns_event*.json` payloads
duncanista Nov 7, 2024
1daeb75
add `base64_to_string` method
duncanista Nov 7, 2024
fcc148e
surrender resource
duncanista Nov 7, 2024
0e17bae
use `SnsRecord` for inferred spans
duncanista Nov 7, 2024
9fe1c49
move some constants
duncanista Nov 7, 2024
e8006f4
add missing trigger tags
duncanista Nov 7, 2024
d3e3670
missed one case
duncanista Nov 7, 2024
9ddf64a
Merge branch 'aj/sns-sqs' into jordan.gonzalez/bottlecap/sns
duncanista Nov 7, 2024
792697f
update unit tests
duncanista Nov 7, 2024
b32e864
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 7, 2024
22f20ed
update `tt` to `t.get_tags()`
duncanista Nov 7, 2024
d0860fd
fmt
duncanista Nov 7, 2024
bd3830f
typo
duncanista Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::config::AwsConfig;

use crate::lifecycle::invocation::triggers::{
api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent,
sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
sns_event::SnsRecord, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
};
use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};

Expand Down Expand Up @@ -104,6 +104,20 @@ impl SpanInferrer {
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
}
} else if SnsRecord::is_match(payload_value) {
if let Some(t) = SnsRecord::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(t.get_tags());
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
}
} else {
debug!("Unable to infer span from payload: no matching trigger found");
}
Expand Down
10 changes: 10 additions & 0 deletions bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{collections::HashMap, hash::BuildHasher};

use base64::{engine::general_purpose, Engine};
use datadog_trace_protobuf::pb::Span;
use serde::{ser::SerializeMap, Serializer};
use serde_json::Value;

pub mod api_gateway_http_event;
pub mod api_gateway_rest_event;
pub mod sns_event;
pub mod sqs_event;

pub const DATADOG_CARRIER_KEY: &str = "_datadog";
Expand All @@ -31,6 +33,14 @@ pub fn get_aws_partition_by_region(region: &str) -> String {
}
}

#[must_use]
pub fn base64_to_string(base64_string: &str) -> String {
let bytes = general_purpose::STANDARD
.decode(base64_string)
.unwrap_or_default();
String::from_utf8_lossy(&bytes).to_string()
}

/// Serialize a `HashMap` with lowercase keys
///
pub fn lowercase_key<S, H>(
Expand Down
317 changes: 317 additions & 0 deletions bottlecap/src/lifecycle/invocation/triggers/sns_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::debug;

use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY},
};

use super::{FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, FUNCTION_TRIGGER_EVENT_SOURCE_TAG};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct SnsEvent {
#[serde(rename = "Records")]
pub records: Vec<SnsRecord>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct SnsRecord {
#[serde(rename = "Sns")]
pub sns: SnsEntity,
#[serde(rename = "EventSubscriptionArn")]
pub event_subscription_arn: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct SnsEntity {
#[serde(rename = "MessageId")]
pub message_id: String,
#[serde(rename = "Type")]
pub r#type: String,
#[serde(rename = "TopicArn")]
pub topic_arn: String,
#[serde(rename = "MessageAttributes")]
pub message_attributes: HashMap<String, MessageAttribute>,
#[serde(rename = "Timestamp")]
pub timestamp: DateTime<Utc>,
#[serde(rename = "Subject")]
pub subject: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct MessageAttribute {
#[serde(rename = "Type")]
pub r#type: String,
#[serde(rename = "Value")]
pub value: String,
}

impl Trigger for SnsRecord {
fn new(payload: serde_json::Value) -> Option<Self> {
match payload.get("Records").and_then(Value::as_array) {
Some(records) => match serde_json::from_value::<SnsRecord>(records[0].clone()) {
Ok(record) => Some(record),
Err(e) => {
debug!("Failed to deserialize SNS Record: {e}");
None
}
},
None => None,
}
}

fn is_match(payload: &serde_json::Value) -> bool {
if let Some(first_record) = payload
.get("Records")
.and_then(Value::as_array)
.and_then(|r| r.first())
.take()
{
return first_record.get("Sns").is_some();
}

false
}

#[allow(clippy::cast_possible_truncation)]
fn enrich_span(&self, span: &mut datadog_trace_protobuf::pb::Span) {
debug!("Enriching an Inferred Span for an SNS Event");
let resource = self
.sns
.topic_arn
.clone()
.split(':')
.last()
.unwrap_or_default()
.to_string();

let start_time = self
.sns
.timestamp
.timestamp_nanos_opt()
.unwrap_or((self.sns.timestamp.timestamp_millis() as f64 * MS_TO_NS) as i64);
// todo: service mapping
let service_name = "sns".to_string();

span.name = "aws.sns".to_string();
span.service = service_name.to_string();
span.resource.clone_from(&resource);
span.r#type = "web".to_string();
span.start = start_time;
span.meta.extend([
("operation_name".to_string(), "aws.sns".to_string()),
("topicname".to_string(), resource),
("topic_arn".to_string(), self.sns.topic_arn.clone()),
("message_id".to_string(), self.sns.message_id.clone()),
("type".to_string(), self.sns.r#type.clone()),
]);

if let Some(subject) = &self.sns.subject {
span.meta.insert("subject".to_string(), subject.clone());
}

if let Some(event_subscription_arn) = &self.event_subscription_arn {
span.meta.insert(
"event_subscription_arn".to_string(),
event_subscription_arn.clone(),
);
}
}

fn get_tags(&self) -> HashMap<String, String> {
HashMap::from([
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"sns".to_string(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
self.sns.topic_arn.clone(),
),
])
}

fn get_arn(&self, _region: &str) -> String {
self.sns.topic_arn.clone()
}

fn get_carrier(&self) -> HashMap<String, String> {
let carrier = HashMap::new();
if let Some(ma) = self.sns.message_attributes.get(DATADOG_CARRIER_KEY) {
match ma.r#type.as_str() {
"String" => return serde_json::from_str(&ma.value).unwrap_or_default(),
"Binary" => {
return serde_json::from_str(&base64_to_string(&ma.value)).unwrap_or_default()
}
_ => {
debug!("Unsupported type in SNS message attribute");
}
}
}

carrier
}

fn is_async(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use datadog_trace_protobuf::pb::Span;

use super::*;
use crate::lifecycle::invocation::triggers::test_utils::read_json_file;

#[test]
fn test_new() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let result = SnsRecord::new(payload).expect("Failed to deserialize into SnsRecord");

let message_attributes = HashMap::<String, MessageAttribute>::from([
("_datadog".to_string(), MessageAttribute {
r#type: "String".to_string(),
value: "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}".to_string(),
})
]);

let expected = SnsRecord {
event_subscription_arn: Some("arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04".to_string()),
sns: SnsEntity {
message_id: "87056a47-f506-5d77-908b-303605d3b197".to_string(),
r#type: "Notification".to_string(),
topic_arn: "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy"
.to_string(),
message_attributes,
timestamp: DateTime::parse_from_rfc3339("2022-01-31T14:13:41.637Z")
.unwrap()
.with_timezone(&Utc),
subject: None,
},
};

assert_eq!(result, expected);
}

#[test]
fn test_is_match() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize SnsRecord");

assert!(SnsRecord::is_match(&payload));
}

#[test]
fn test_is_not_match() {
let json = read_json_file("sqs_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize SqsRecord");
assert!(!SnsRecord::is_match(&payload));
}

#[test]
fn test_enrich_span() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
let mut span = Span::default();
event.enrich_span(&mut span);
assert_eq!(span.name, "aws.sns");
assert_eq!(span.service, "sns");
assert_eq!(span.resource, "serverlessTracingTopicPy");
assert_eq!(span.r#type, "web");

assert_eq!(
span.meta,
HashMap::from([
("operation_name".to_string(), "aws.sns".to_string()),
("topicname".to_string(), "serverlessTracingTopicPy".to_string()),
("topic_arn".to_string(), "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string()),
("message_id".to_string(), "87056a47-f506-5d77-908b-303605d3b197".to_string()),
("type".to_string(), "Notification".to_string()),
("event_subscription_arn".to_string(), "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy:224b60ba-befc-4830-ad96-f1f0ac94eb04".to_string())
])
);
}

#[test]
fn test_get_tags() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
let tags = event.get_tags();

let expected = HashMap::from([
(
"function_trigger.event_source".to_string(),
"sns".to_string(),
),
(
"function_trigger.event_source_arn".to_string(),
"arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string(),
),
]);

assert_eq!(tags, expected);
}

#[test]
fn test_get_arn() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
assert_eq!(
event.get_arn("us-east-1"),
"arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy"
);
}

#[test]
fn test_get_carrier() {
let json = read_json_file("sns_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
let carrier = event.get_carrier();

let expected = HashMap::from([
(
"x-datadog-trace-id".to_string(),
"4948377316357291421".to_string(),
),
(
"x-datadog-parent-id".to_string(),
"6746998015037429512".to_string(),
),
("x-datadog-sampling-priority".to_string(), "1".to_string()),
]);

assert_eq!(carrier, expected);
}

#[test]
fn test_get_carrier_from_binary_value() {
let json = read_json_file("sns_event_binary.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
let carrier = event.get_carrier();

let expected = HashMap::from([
(
"x-datadog-trace-id".to_string(),
"4948377316357291421".to_string(),
),
(
"x-datadog-parent-id".to_string(),
"6746998015037429512".to_string(),
),
("x-datadog-sampling-priority".to_string(), "1".to_string()),
]);

assert_eq!(carrier, expected);
}
}
2 changes: 1 addition & 1 deletion bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl Trigger for SqsRecord {

span.name = "aws.sqs".to_string();
span.service = service_name.to_string();
span.resource.clone_from(&resource);
span.resource = resource;
span.r#type = "web".to_string();
span.start = start_time;
span.meta.extend(HashMap::from([
Expand Down
Loading