Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6618e05
wip: sqs
astuyve Oct 23, 2024
27c2ddb
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
058059e
feat: sqs tests
astuyve Oct 23, 2024
94b7fc7
invert duration check
duncanista Oct 23, 2024
ea156bc
remove duration set
duncanista Oct 23, 2024
dd13114
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Oct 23, 2024
dad27a8
fmt and add `test_get_arn`
duncanista Oct 23, 2024
3ae55f3
remove unneeded reference
duncanista Oct 23, 2024
cc4c16c
remove unneeded comments
duncanista Oct 23, 2024
9cf0a2a
add `get_carrier` implementation for `SqsRecord`
duncanista Oct 23, 2024
e5ed400
add trace context to `sqs_event.json`
duncanista Oct 23, 2024
dec3bf9
fix: resource_names is not needed
astuyve Oct 24, 2024
7b99cd2
fix: don't deserialize body
astuyve Oct 24, 2024
14a1c08
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 4, 2024
fd2864e
avoid `use super::...`
duncanista Nov 5, 2024
e3d6c3f
Merge branch 'aj/sns-sqs' of ssh://github.com/DataDog/datadog-lambda-…
duncanista Nov 5, 2024
7a403de
Merge branch 'jordan.gonzalez/bottlecap/universal-instrumentation' in…
duncanista Nov 5, 2024
b4f1dff
fix unit tests
duncanista Nov 5, 2024
da6c083
set carrier and trigger tags
duncanista Nov 5, 2024
29eaaa5
remove duplicate tag
duncanista Nov 6, 2024
5085bf4
fmt
duncanista Nov 6, 2024
f5fe9ab
pass headers to `on_invocation_end`
duncanista Nov 6, 2024
0cf4dbe
infer first, then extract
duncanista Nov 6, 2024
b136373
reset values on every infer
duncanista Nov 6, 2024
9fe1c49
move some constants
duncanista Nov 7, 2024
e8006f4
add missing trigger tags
duncanista Nov 7, 2024
d3e3670
missed one case
duncanista Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,15 @@ impl Processor {
self.span.trace_id = 0;
self.span.parent_id = 0;
self.span.span_id = 0;
self.extracted_span_context = None;

let payload_value = match serde_json::from_slice::<Value>(&payload) {
Ok(value) => value,
Err(_) => json!({}),
};

self.extracted_span_context = self.extract_span_context(&headers, &payload_value);
self.inferrer.infer_span(&payload_value, &self.aws_config);
self.extracted_span_context = self.extract_span_context(&headers, &payload_value);

if let Some(sc) = &self.extracted_span_context {
self.span.trace_id = sc.trace_id;
Expand Down Expand Up @@ -264,19 +265,46 @@ impl Processor {
///
pub fn on_invocation_end(
&mut self,
trace_id: u64,
span_id: u64,
parent_id: u64,
headers: HashMap<String, String>,
status_code: Option<String>,
) {
self.span.trace_id = trace_id;
self.span.span_id = span_id;

self.update_span_context(headers);
if self.inferrer.inferred_span.is_some() {
if let Some(status_code) = status_code {
self.inferrer.set_status_code(status_code);
}
} else {
}
}

fn update_span_context(&mut self, headers: HashMap<String, String>) {
// todo: fix this, code is a copy of the existing logic in Go, not accounting
// when a 128 bit trace id exist
let mut trace_id = 0;
let mut span_id = 0;
let mut parent_id = 0;

// If we have a trace context, update the span context
if let Some(sc) = &mut self.extracted_span_context {
trace_id = sc.trace_id;
span_id = sc.span_id;
}

if let Some(header) = headers.get("x-datadog-trace-id") {
trace_id = header.parse::<u64>().unwrap_or(0);
}

if let Some(header) = headers.get("x-datadog-span-id") {
span_id = header.parse::<u64>().unwrap_or(0);
}

if let Some(header) = headers.get("x-datadog-parent-id") {
parent_id = header.parse::<u64>().unwrap_or(0);
}

self.span.trace_id = trace_id;
self.span.span_id = span_id;

if self.inferrer.inferred_span.is_none() {
self.span.parent_id = parent_id;
}
}
Expand Down
67 changes: 39 additions & 28 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ use crate::config::AwsConfig;

use crate::lifecycle::invocation::triggers::{
api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent,
Trigger,
sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
};
use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};

const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source";
const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn";

pub struct SpanInferrer {
pub inferred_span: Option<Span>,
is_async_span: bool,
Expand Down Expand Up @@ -46,6 +43,10 @@ impl SpanInferrer {
///
pub fn infer_span(&mut self, payload_value: &Value, aws_config: &AwsConfig) {
self.inferred_span = None;
self.is_async_span = false;
self.carrier = None;
self.trigger_tags = None;

if APIGatewayHttpEvent::is_match(payload_value) {
if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) {
let mut span = Span {
Expand All @@ -54,19 +55,14 @@ impl SpanInferrer {
};

t.enrich_span(&mut span);
span.meta.extend([
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"api_gateway".to_string(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
),
]);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(t.get_tags());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
}
Expand All @@ -78,24 +74,38 @@ impl SpanInferrer {
};

t.enrich_span(&mut span);
span.meta.extend([
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"api_gateway".to_string(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
),
]);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
}
} else if SqsRecord::is_match(payload_value) {
if let Some(t) = SqsRecord::new(payload_value.clone()) {
let mut span = Span {
span_id: Self::generate_span_id(),
..Default::default()
};

t.enrich_span(&mut span);
let mut tt = t.get_tags();
tt.extend([(
FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG.to_string(),
t.get_arn(&aws_config.region),
)]);

self.carrier = Some(t.get_carrier());
self.trigger_tags = Some(t.get_tags());
self.trigger_tags = Some(tt);
self.is_async_span = t.is_async();
self.inferred_span = Some(span);
}
} else {
debug!("Unable to infer span from payload");
debug!("Unable to infer span from payload: no matching trigger found");
}
}

Expand Down Expand Up @@ -124,7 +134,8 @@ impl SpanInferrer {
pub fn complete_inferred_span(&mut self, invocation_span: &Span) {
if let Some(s) = &mut self.inferred_span {
if self.is_async_span {
if s.duration != 0 {
// SNS to SQS span duration will be set
if s.duration == 0 {
let duration = invocation_span.start - s.start;
s.duration = duration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use tracing::debug;

use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{get_aws_partition_by_region, lowercase_key, Trigger},
triggers::{
get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
},
};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -114,10 +116,7 @@ impl Trigger for APIGatewayHttpEvent {
"request_id".to_string(),
self.request_context.request_id.clone(),
),
("resource_names".to_string(), resource),
]));

// todo: update global(? IsAsync if event payload is `Event`
}

fn get_tags(&self) -> HashMap<String, String> {
Expand All @@ -140,6 +139,10 @@ impl Trigger for APIGatewayHttpEvent {
"http.method".to_string(),
self.request_context.http.method.clone(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"api-gateway".to_string(),
),
]);
// route is parameterized
// /users/{id}/profile
Expand Down Expand Up @@ -287,7 +290,6 @@ mod tests {
("http.user_agent".to_string(), "curl/7.64.1".to_string()),
("operation_name".to_string(), "aws.httpapi".to_string()),
("request_id".to_string(), "FaHnXjKCGjQEJ7A=".to_string()),
("resource_names".to_string(), "GET /httpapi/get".to_string()),
])
);
}
Expand All @@ -311,6 +313,10 @@ mod tests {
("http.method".to_string(), "GET".to_string()),
("http.route".to_string(), "/httpapi/get".to_string()),
("http.user_agent".to_string(), "curl/7.64.1".to_string()),
(
"function_trigger.event_source".to_string(),
"api-gateway".to_string(),
),
]);

assert_eq!(tags, expected);
Expand Down Expand Up @@ -345,7 +351,6 @@ mod tests {
("http.user_agent".to_string(), "curl/8.1.2".to_string()),
("operation_name".to_string(), "aws.httpapi".to_string()),
("request_id".to_string(), "Ur2JtjEfGjQEPOg=".to_string()),
("resource_names".to_string(), "GET /user/{id}".to_string()),
])
);
}
Expand All @@ -367,6 +372,10 @@ mod tests {
("http.method".to_string(), "GET".to_string()),
("http.route".to_string(), "/user/{id}".to_string()),
("http.user_agent".to_string(), "curl/8.1.2".to_string()),
(
"function_trigger.event_source".to_string(),
"api-gateway".to_string(),
),
]);
assert_eq!(tags, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use tracing::debug;

use crate::lifecycle::invocation::{
processor::MS_TO_NS,
triggers::{get_aws_partition_by_region, lowercase_key, Trigger},
triggers::{
get_aws_partition_by_region, lowercase_key, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
},
};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -109,7 +111,6 @@ impl Trigger for APIGatewayRestEvent {
"request_id".to_string(),
self.request_context.request_id.clone(),
),
("resource_names".to_string(), resource.clone()),
(
"http.route".to_string(),
self.request_context.resource_path.clone(),
Expand Down Expand Up @@ -143,6 +144,10 @@ impl Trigger for APIGatewayRestEvent {
"http.user_agent".to_string(),
self.request_context.identity.user_agent.to_string(),
),
(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"api-gateway".to_string(),
),
]);

if let Some(referer) = self.headers.get("referer") {
Expand Down Expand Up @@ -256,7 +261,6 @@ mod tests {
("http.route".to_string(), "/path".to_string()),
("operation_name".to_string(), "aws.apigateway".to_string()),
("request_id".to_string(), "id=".to_string()),
("resource_names".to_string(), "GET /path".to_string()),
])
);
}
Expand All @@ -278,6 +282,10 @@ mod tests {
("http.method".to_string(), "GET".to_string()),
("http.route".to_string(), "/path".to_string()),
("http.user_agent".to_string(), "user-agent".to_string()),
(
"function_trigger.event_source".to_string(),
"api-gateway".to_string(),
),
]);

assert_eq!(tags, expected);
Expand Down Expand Up @@ -314,7 +322,6 @@ mod tests {
"request_id".to_string(),
"e16399f7-e984-463a-9931-745ba021a27f".to_string(),
),
("resource_names".to_string(), "GET /user/{id}".to_string()),
]);
assert_eq!(span.meta, expected);
}
Expand Down Expand Up @@ -342,6 +349,10 @@ mod tests {
("http.method".to_string(), "GET".to_string()),
("http.route".to_string(), "/user/{id}".to_string()),
("http.user_agent".to_string(), "curl/8.1.2".to_string()),
(
"function_trigger.event_source".to_string(),
"api-gateway".to_string()
),
])
);
}
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ use serde_json::Value;

pub mod api_gateway_http_event;
pub mod api_gateway_rest_event;
pub mod sqs_event;

pub const DATADOG_CARRIER_KEY: &str = "_datadog";
pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source";
pub const FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG: &str = "function_trigger.event_source_arn";

pub trait Trigger: Sized {
fn new(payload: Value) -> Option<Self>;
Expand Down
Loading