Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6618e05
wip: sqs
astuyve Oct 23, 2024
27c2ddb
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
058059e
feat: sqs tests
astuyve Oct 23, 2024
94b7fc7
invert duration check
duncanista Oct 23, 2024
ea156bc
remove duration set
duncanista Oct 23, 2024
dd13114
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
dad27a8
fmt and add `test_get_arn`
duncanista Oct 23, 2024
3ae55f3
remove unneeded reference
duncanista Oct 23, 2024
cc4c16c
remove unneeded comments
duncanista Oct 23, 2024
9cf0a2a
add `get_carrier` implementation for `SqsRecord`
duncanista Oct 23, 2024
e5ed400
add trace context to `sqs_event.json`
duncanista Oct 23, 2024
dec3bf9
fix: resource_names is not needed
astuyve Oct 24, 2024
7b99cd2
fix: don't deserialize body
astuyve Oct 24, 2024
14a1c08
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 4, 2024
fd2864e
avoid `use super::...`
duncanista Nov 5, 2024
e3d6c3f
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Nov 5, 2024
7a403de
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 5, 2024
b4f1dff
fix unit tests
duncanista Nov 5, 2024
da6c083
set carrier and trigger tags
duncanista Nov 5, 2024
29eaaa5
remove duplicate tag
duncanista Nov 6, 2024
5085bf4
fmt
duncanista Nov 6, 2024
f5fe9ab
pass headers to `on_invocation_end`
duncanista Nov 6, 2024
0cf4dbe
infer first, then extract
duncanista Nov 6, 2024
b136373
reset values on every infer
duncanista Nov 6, 2024
b9edb96
add `sns_event.rs`
duncanista Nov 7, 2024
8232e6a
add `sns_event*.json` payloads
duncanista Nov 7, 2024
1daeb75
add `base64_to_string` method
duncanista Nov 7, 2024
fcc148e
surrender resource
duncanista Nov 7, 2024
0e17bae
use `SnsRecord` for inferred spans
duncanista Nov 7, 2024
9fe1c49
move some constants
duncanista Nov 7, 2024
e8006f4
add missing trigger tags
duncanista Nov 7, 2024
d3e3670
missed one case
duncanista Nov 7, 2024
9ddf64a
Merge branch 'aj/sns-sqs' into jordan.gonzalez/bottlecap/sns
duncanista Nov 7, 2024
792697f
update unit tests
duncanista Nov 7, 2024
b32e864
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 7, 2024
22f20ed
update `tt` to `t.get_tags()`
duncanista Nov 7, 2024
d0860fd
fmt
duncanista Nov 7, 2024
bd3830f
typo
duncanista Nov 7, 2024
e29f088
update tags
duncanista Nov 8, 2024
b942ad8
SQS event can contain SNS carrier
duncanista Nov 8, 2024
c4ce7b5
make some `Trigger` methods to be `Sized`
duncanista Nov 8, 2024
026d7e0
add `sns_sqs_event.json`
duncanista Nov 8, 2024
f9c7247
account for wrapped inferred span in processor
duncanista Nov 8, 2024
227699a
simplify code in `span_inferrer.rs`
duncanista Nov 8, 2024
3ec6c92
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 8, 2024
3a215cf
remove duplicated condition
duncanista Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,22 @@ impl Processor {
self.span.meta.extend(trigger_tags);
}

self.inferrer.complete_inferred_span(&self.span);
self.inferrer.complete_inferred_spans(&self.span);

if self.tracer_detected {
let mut body_size = std::mem::size_of_val(&self.span);
let mut traces = vec![self.span.clone()];

if let Some(inferred_span) = &self.inferrer.inferred_span {
body_size += std::mem::size_of_val(inferred_span);
traces.push(inferred_span.clone());
}

if let Some(ws) = &self.inferrer.wrapped_inferred_span {
body_size += std::mem::size_of_val(ws);
traces.push(ws.clone());
}

// todo: figure out what to do here
let header_tags = tracer_header_tags::TracerHeaderTags {
lang: "",
Expand Down
145 changes: 83 additions & 62 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use tracing::debug;
use crate::config::AwsConfig;

use crate::lifecycle::invocation::triggers::{
api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent,
sns_event::SnsRecord, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
api_gateway_http_event::APIGatewayHttpEvent,
api_gateway_rest_event::APIGatewayRestEvent,
sns_event::{SnsEntity, SnsRecord},
sqs_event::SqsRecord,
Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
};
use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};

pub struct SpanInferrer {
pub inferred_span: Option<Span>,
pub wrapped_inferred_span: Option<Span>,
is_async_span: bool,
carrier: Option<HashMap<String, String>>,
trigger_tags: Option<HashMap<String, String>>,
Expand All @@ -31,6 +35,7 @@ impl SpanInferrer {
pub fn new() -> Self {
Self {
inferred_span: None,
wrapped_inferred_span: None,
is_async_span: false,
carrier: None,
trigger_tags: None,
Expand All @@ -43,84 +48,79 @@ impl SpanInferrer {
///
pub fn infer_span(&mut self, payload_value: &Value, aws_config: &AwsConfig) {
self.inferred_span = None;
self.wrapped_inferred_span = None;
self.is_async_span = false;
self.carrier = None;
self.trigger_tags = None;

let mut trigger: Option<Box<dyn Trigger>> = None;
let mut inferred_span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

if APIGatewayHttpEvent::is_match(payload_value) {
if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
t.enrich_span(&mut inferred_span);

trigger = Some(Box::new(t));
}
} else if APIGatewayRestEvent::is_match(payload_value) {
if let Some(t) = APIGatewayRestEvent::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
t.enrich_span(&mut inferred_span);

trigger = Some(Box::new(t));
}
} else if SqsRecord::is_match(payload_value) {
if let Some(t) = SqsRecord::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
t.enrich_span(&mut inferred_span);

// Check for SNS event wrapped in the SQS body
if let Ok(sns_entity) = serde_json::from_str::<SnsEntity>(&t.body) {
debug!("Found an SNS event wrapped in the SQS body");
let mut wrapped_inferred_span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

let wt = SnsRecord {
sns: sns_entity,
event_subscription_arn: None,
};
wt.enrich_span(&mut wrapped_inferred_span);
inferred_span.meta.extend(wt.get_tags());

wrapped_inferred_span.duration =
inferred_span.start - wrapped_inferred_span.start;

self.wrapped_inferred_span = Some(wrapped_inferred_span);
}

trigger = Some(Box::new(t));
}
} else if SnsRecord::is_match(payload_value) {
if let Some(t) = SnsRecord::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);
t.enrich_span(&mut inferred_span);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(t.get_tags());
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
trigger = Some(Box::new(t));
}
} else {
debug!("Unable to infer span from payload: no matching trigger found");
}

// Inferred a trigger
if let Some(t) = trigger {
let mut trigger_tags = t.get_tags();
trigger_tags.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.trigger_tags = Some(trigger_tags);
self.carrier = Some(t.get_carrier());
self.is_async_span = t.is_async();
self.inferred_span = Some(inferred_span);
}
}

/// If a `self.inferred_span` exist, set the `parent_id` to
Expand All @@ -144,9 +144,30 @@ impl SpanInferrer {
}
}

// TODO add status tag and other info from response
pub fn complete_inferred_span(&mut self, invocation_span: &Span) {
// TODO: add status tag and other info from response
// TODO: add peer.service
pub fn complete_inferred_spans(&mut self, invocation_span: &Span) {
if let Some(s) = &mut self.inferred_span {
if let Some(ws) = &mut self.wrapped_inferred_span {
// Set correct Parent ID for multiple inferred spans
ws.parent_id = s.parent_id;
s.parent_id = ws.span_id;

// TODO: clean this logic
if self.is_async_span {
// SNS to SQS span duration will be set
if ws.duration == 0 {
let duration = s.start - ws.start;
ws.duration = duration;
}
} else {
let duration = s.start - ws.start;
ws.duration = duration;
}

ws.trace_id = invocation_span.trace_id;
}

if self.is_async_span {
// SNS to SQS span duration will be set
if s.duration == 0 {
Expand Down
10 changes: 7 additions & 3 deletions bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ pub const DATADOG_CARRIER_KEY: &str = "_datadog";
pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source";
pub const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn";

pub trait Trigger: Sized {
fn new(payload: Value) -> Option<Self>;
fn is_match(payload: &Value) -> bool;
pub trait Trigger {
fn new(payload: Value) -> Option<Self>
where
Self: Sized;
fn is_match(payload: &Value) -> bool
where
Self: Sized;
fn enrich_span(&self, span: &mut Span);
fn get_tags(&self) -> HashMap<String, String>;
fn get_arn(&self, region: &str) -> String;
Expand Down
32 changes: 9 additions & 23 deletions bottlecap/src/lifecycle/invocation/triggers/sns_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use tracing::debug;

use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY},
triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
};

use super::{FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, FUNCTION_TRIGGER_EVENT_SOURCE_TAG};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct SnsEvent {
#[serde(rename = "Records")]
Expand Down Expand Up @@ -123,16 +121,10 @@ impl Trigger for SnsRecord {
}

fn get_tags(&self) -> HashMap<String, String> {
HashMap::from([
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"sns".to_string(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
self.sns.topic_arn.clone(),
),
])
HashMap::from([(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"sns".to_string(),
)])
}

fn get_arn(&self, _region: &str) -> String {
Expand Down Expand Up @@ -246,16 +238,10 @@ mod tests {
let event = SnsRecord::new(payload).expect("Failed to deserialize SnsRecord");
let tags = event.get_tags();

let expected = HashMap::from([
(
"function_trigger.event_source".to_string(),
"sns".to_string(),
),
(
"function_trigger.event_source_arn".to_string(),
"arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy".to_string(),
),
]);
let expected = HashMap::from([(
"function_trigger.event_source".to_string(),
"sns".to_string(),
)]);

assert_eq!(tags, expected);
}
Expand Down
42 changes: 39 additions & 3 deletions bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use tracing::debug;
use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{
get_aws_partition_by_region, Trigger, DATADOG_CARRIER_KEY,
FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
get_aws_partition_by_region,
sns_event::{SnsEntity, SnsRecord},
Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
},
};

Expand All @@ -35,6 +36,7 @@ pub struct SqsRecord {
pub event_source_arn: String,
#[serde(rename = "awsRegion")]
pub aws_region: String,
pub body: String,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -178,8 +180,19 @@ impl Trigger for SqsRecord {
}
}

// TODO: AWSTraceHeader
// TODO: Check for EventBridge event sent through SQS

// Check for SNS event sent through SQS
if let Ok(sns_entity) = serde_json::from_str::<SnsEntity>(&self.body) {
let sns_record = SnsRecord {
sns: sns_entity,
event_subscription_arn: None,
};

return sns_record.get_carrier();
}

// TODO: AWSTraceHeader
carrier
}
}
Expand Down Expand Up @@ -219,6 +232,7 @@ mod tests {
event_source: "aws:sqs".to_string(),
event_source_arn: "arn:aws:sqs:us-east-1:123456789012:MyQueue".to_string(),
aws_region: "us-east-1".to_string(),
body: "Hello from SQS!".to_string(),
};

assert_eq!(result, expected);
Expand Down Expand Up @@ -326,4 +340,26 @@ mod tests {

assert_eq!(carrier, expected);
}

#[test]
fn test_get_carrier_from_sns() {
let json = read_json_file("sns_sqs_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SqsRecord::new(payload).expect("Failed to deserialize SqsRecord");
let carrier = event.get_carrier();

let expected = HashMap::from([
(
"x-datadog-trace-id".to_string(),
"2776434475358637757".to_string(),
),
(
"x-datadog-parent-id".to_string(),
"4493917105238181843".to_string(),
),
("x-datadog-sampling-priority".to_string(), "1".to_string()),
]);

assert_eq!(carrier, expected);
}
}
20 changes: 20 additions & 0 deletions bottlecap/tests/payloads/sns_sqs_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"Records": [
{
"messageId": "64812b68-4d9b-4dca-b3fb-9b18f255ee51",
"receiptHandle": "AQEBER6aRkfG8092GvkL7FRwCwbQ7LLDW9Tlk/CembqHe+suS2kfFxXiukomvaIN61QoyQMoRgWuV52SDkiQno2u+5hP64BDbmw+e/KR9ayvIfHJ3M6RfyQLaWNWm3hDFBCKTnBMVIxtdx0N9epZZewyokjKcrNYtmCghFgTCvZzsQkowi5rnoHAVHJ3je1c3bDnQ1KLrZFgajDnootYXDwEPuMq5FIxrf4EzTe0S7S+rnRm+GaQfeBLBVAY6dASL9usV3/AFRqDtaI7GKI+0F2NCgLlqj49VlPRz4ldhkGknYlKTZTluAqALWLJS62/J1GQo53Cs3nneJcmu5ajB2zzmhhRXoXINEkLhCD5ujZfcsw9H4xqW69Or4ECvlqx14bUU2rtMIW0QM2p7pEeXnyocymQv6m1te113eYWTVmaJ4I=",
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"0a0ab23e-4861-5447-82b7-e8094ff3e332\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA\",\n \"Message\" : \"{\\\"hello\\\":\\\"harv\\\",\\\"nice of you to join us\\\":\\\"david\\\",\\\"anotherThing\\\":{\\\"foo\\\":\\\"bar\\\",\\\"blah\\\":null,\\\"harv\\\":123},\\\"vals\\\":[{\\\"thingOne\\\":1},{\\\"thingTwo\\\":2}],\\\"ajTimestamp\\\":1639777617957}\",\n \"Timestamp\" : \"2021-12-17T21:46:58.040Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"FR35/7E8C3LHEVk/rC4XxXlXwV/5mNkFNPgDhHSnJ2I6hIoSrTROAm7h5xm1PuBkAeFDvq0zofw91ouk9zZyvhdrMLFIIgrjEyNayRmEffmoEAkzLFUsgtQX7MmTl644r4NuWiM0Oiz7jueRvIcKXcZr7Nc6GJcWV1ymec8oOmuHNMisnPMxI07LIQVYSyAfv6P9r2jEWMVIukRoCzwTnRk4bUUYhPSGHI7OC3AsxxXBbv8snqTrLM/4z2rXCf6jHCKNxWeLlm9/45PphCkEyx5BWS4/71KaoMWUWy8+6CCsy+uF3XTCVmvSEYLyEwTSzOY+vCUjazrRW93498i70g==\",\n \"SigningCertUrl\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem\",\n \"UnsubscribeUrl\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:601427279990:js-library-test-dev-demoTopic-15WGUVRCBMPAA:1290f550-9a8a-4e8f-a900-8f5f96dcddda\",\n \"MessageAttributes\" : {\n \"_datadog\" : {\"Type\":\"String\",\"Value\":\"{\\\"x-datadog-trace-id\\\":\\\"2776434475358637757\\\",\\\"x-datadog-parent-id\\\":\\\"4493917105238181843\\\",\\\"x-datadog-sampling-priority\\\":\\\"1\\\"}\"}\n }\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1639777618130",
"SenderId": "AIDAIOA2GYWSHW4E2VXIO",
"ApproximateFirstReceiveTimestamp": "1639777618132"
},
"messageAttributes": {},
"md5OfBody": "ee19d8b1377919239ad3fd5dabc33739",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:eu-west-1:601427279990:aj-js-library-test-dev-demo-queue",
"awsRegion": "eu-west-1"
}
]
}