Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ use crate::{
telemetry::events::{ReportMetrics, Status},
traces::{
context::SpanContext,
propagation::{DatadogCompositePropagator, Propagator},
propagation::{
text_map_propagator::{
DatadogHeaderPropagator, DATADOG_PARENT_ID_KEY, DATADOG_SPAN_ID_KEY,
DATADOG_TRACE_ID_KEY,
},
DatadogCompositePropagator, Propagator,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At point in the invocation is this happening? Is this extracting context from the inbound event? Or is it getting it from the start invocation request headers?

Just asking cuz it doesn't look like we have support for w3c headers. Is that correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which line you're talking about, you left the comment in the import

Copy link
Copy Markdown
Contributor Author

@duncanista duncanista Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking cuz it doesn't look like we have support for w3c headers. Is that correct?

We do, the composite propagator does that

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, didn't realize this is just the import. Thanks for answering the question!

},
trace_processor,
},
};
Expand Down Expand Up @@ -278,6 +284,8 @@ impl Processor {
self.span.trace_id = 0;
self.span.parent_id = 0;
self.span.span_id = 0;
self.span.error = 0;
self.span.meta.clear();
self.extracted_span_context = None;

let payload_value = match serde_json::from_slice::<Value>(&payload) {
Expand All @@ -288,6 +296,7 @@ impl Processor {
self.inferrer.infer_span(&payload_value, &self.aws_config);
self.extracted_span_context = self.extract_span_context(&headers, &payload_value);

// Set the extracted trace context to the spans
if let Some(sc) = &self.extracted_span_context {
self.span.trace_id = sc.trace_id;
self.span.parent_id = sc.span_id;
Expand All @@ -302,6 +311,8 @@ impl Processor {
}
}

// If we have an inferred span, set the invocation span parent id
// to be the inferred span id, even if we don't have an extracted trace context
if let Some(inferred_span) = &self.inferrer.inferred_span {
self.span.parent_id = inferred_span.span_id;
}
Expand All @@ -312,11 +323,8 @@ impl Processor {
headers: &HashMap<String, String>,
payload_value: &Value,
) -> Option<SpanContext> {
if let Some(carrier) = self.inferrer.get_carrier() {
if let Some(sc) = self.propagator.extract(&carrier) {
debug!("Extracted trace context from inferred span");
return Some(sc);
}
if let Some(sc) = self.inferrer.get_span_context(&self.propagator) {
return Some(sc);
}

if let Some(payload_headers) = payload_value.get("headers") {
Expand Down Expand Up @@ -363,36 +371,51 @@ impl Processor {
}

fn update_span_context_from_headers(&mut self, headers: &HashMap<String, String>) {
// todo: fix this, code is a copy of the existing logic in Go, not accounting
// when a 128 bit trace id exist
let mut trace_id = 0;
let mut span_id = 0;
let mut parent_id = 0;
let mut tags: HashMap<String, String> = HashMap::new();

// If we have a trace context, update the span context
// If we have a trace context, this means we got it from
// distributed tracing
if let Some(sc) = &mut self.extracted_span_context {
debug!("Trace context was found, not extracting it from incoming headers");
trace_id = sc.trace_id;
span_id = sc.span_id;
parent_id = sc.span_id;
tags.extend(sc.tags.clone());
}

if let Some(header) = headers.get("x-datadog-trace-id") {
trace_id = header.parse::<u64>().unwrap_or(0);
}
// We are the root span, so we should extract the trace context
// from the tracer, which has sent it through end invocation headers
if trace_id == 0 {
debug!("No trace context found, extracting it from headers");
// Extract trace context from headers manually
if let Some(header) = headers.get(DATADOG_TRACE_ID_KEY) {
trace_id = header.parse::<u64>().unwrap_or(0);
}

if let Some(header) = headers.get(DATADOG_PARENT_ID_KEY) {
parent_id = header.parse::<u64>().unwrap_or(0);
}

// TODO: sampling priority extraction

if let Some(header) = headers.get("x-datadog-span-id") {
span_id = header.parse::<u64>().unwrap_or(0);
// Extract tags from headers
// Used for 128 bit trace ids
tags = DatadogHeaderPropagator::extract_tags(headers);
}

if let Some(header) = headers.get("x-datadog-parent-id") {
parent_id = header.parse::<u64>().unwrap_or(0);
// We should always use the generated trace id from the tracer
if let Some(header) = headers.get(DATADOG_SPAN_ID_KEY) {
self.span.span_id = header.parse::<u64>().unwrap_or(0);
}

self.span.trace_id = trace_id;
self.span.span_id = span_id;

// If no inferred span, set the parent id right away
if self.inferrer.inferred_span.is_none() {
if self.inferrer.inferred_span.is_some() {
self.inferrer.extend_meta(tags);
} else {
self.span.parent_id = parent_id;
self.span.meta.extend(tags);
}
}

Expand Down
48 changes: 41 additions & 7 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@ use crate::lifecycle::invocation::triggers::{
dynamodb_event::DynamoDbRecord,
event_bridge_event::EventBridgeEvent,
kinesis_event::KinesisRecord,
s3_event::S3Record,
sns_event::{SnsEntity, SnsRecord},
sqs_event::SqsRecord,
step_function_event::StepFunctionEvent,
Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
};
use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};

use super::triggers::s3_event::S3Record;
use crate::traces::{context::SpanContext, propagation::Propagator};

pub struct SpanInferrer {
// Span inferred from the Lambda incoming request payload
pub inferred_span: Option<Span>,
// Nested span inferred from the Lambda incoming request payload
pub wrapped_inferred_span: Option<Span>,
// If the inferred span is async
is_async_span: bool,
// Carrier to extract the span context from
carrier: Option<HashMap<String, String>>,
// Generated Span Context from Step Functions
generated_span_context: Option<SpanContext>,
// Tags generated from the trigger
trigger_tags: Option<HashMap<String, String>>,
}

Expand All @@ -43,6 +51,7 @@ impl SpanInferrer {
wrapped_inferred_span: None,
is_async_span: false,
carrier: None,
generated_span_context: None,
trigger_tags: None,
}
}
Expand All @@ -51,11 +60,13 @@ impl SpanInferrer {
/// and try matching it to a `Trigger` implementation, which will create
/// an inferred span and set it to `self.inferred_span`
///
#[allow(clippy::too_many_lines)]
pub fn infer_span(&mut self, payload_value: &Value, aws_config: &AwsConfig) {
self.inferred_span = None;
self.wrapped_inferred_span = None;
self.is_async_span = false;
self.carrier = None;
self.generated_span_context = None;
self.trigger_tags = None;

let mut trigger: Option<Box<dyn Trigger>> = None;
Expand Down Expand Up @@ -167,6 +178,11 @@ impl SpanInferrer {
if let Some(t) = KinesisRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);

trigger = Some(Box::new(t));
}
} else if StepFunctionEvent::is_match(payload_value) {
if let Some(t) = StepFunctionEvent::new(payload_value.clone()) {
self.generated_span_context = Some(t.get_span_context());
trigger = Some(Box::new(t));
}
} else {
Expand All @@ -184,7 +200,13 @@ impl SpanInferrer {
self.trigger_tags = Some(trigger_tags);
self.carrier = Some(t.get_carrier());
self.is_async_span = t.is_async();
self.inferred_span = Some(inferred_span);

// For Step Functions, there is no inferred span
if self.generated_span_context.is_some() {
self.inferred_span = None;
} else {
self.inferred_span = Some(inferred_span);
}
}
}

Expand Down Expand Up @@ -263,11 +285,23 @@ impl SpanInferrer {
rng.gen()
}

/// Returns a clone of the carrier associated with the inferred span
/// Returns the extracted span context
///
#[must_use]
pub fn get_carrier(&self) -> Option<HashMap<String, String>> {
self.carrier.clone()
/// If the carrier is set, it will try to extract the span context,
/// otherwise it will
///
pub fn get_span_context(&self, propagator: &impl Propagator) -> Option<SpanContext> {
// Step Functions `SpanContext` is deterministically generated
if let Some(sc) = &self.generated_span_context {
return Some(sc.clone());
}

if let Some(sc) = self.carrier.as_ref().and_then(|c| propagator.extract(c)) {
debug!("Extracted trace context from inferred span");
return Some(sc);
}

None
}

/// Returns a clone of the tags associated with the inferred span
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod kinesis_event;
pub mod s3_event;
pub mod sns_event;
pub mod sqs_event;
pub mod step_function_event;

pub const DATADOG_CARRIER_KEY: &str = "_datadog";
pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source";
Expand Down
Loading