Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ContextBuffer {
/// Creates a new `Context` and adds it to the buffer.
///
pub fn create_context(&mut self, request_id: String) {
self.insert(Context::new(request_id, 0.0, 0.0, 0, None));
self.insert(Context::new(request_id, 0f64, 0f64, 0, None));
}

/// Adds the init duration to a `Context` in the buffer.
Expand Down Expand Up @@ -188,20 +188,20 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);

// This should replace the first context
let request_id_3 = String::from("3");
let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id_3.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_3).unwrap(), &context);
Expand All @@ -215,13 +215,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -242,13 +242,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id_2.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -263,21 +263,21 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

buffer.add_init_duration(&request_id, 100.0);
assert_eq!(buffer.get(&request_id).unwrap().init_duration_ms, 100.0);
buffer.add_init_duration(&request_id, 100f64);
assert!((buffer.get(&request_id).unwrap().init_duration_ms - 100f64).abs() < f64::EPSILON);
}

#[test]
fn test_add_start_time() {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -291,41 +291,43 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

buffer.add_runtime_duration(&request_id, 100.0);
assert_eq!(buffer.get(&request_id).unwrap().runtime_duration_ms, 100.0);
buffer.add_runtime_duration(&request_id, 100f64);
assert!(
(buffer.get(&request_id).unwrap().runtime_duration_ms - 100f64).abs() < f64::EPSILON
);
}

#[test]
fn test_add_enhanced_metric_data() {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
let context = Context::new(request_id.clone(), 0f64, 0f64, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let network_offset = Some(NetworkData {
rx_bytes: 180.0,
rx_bytes: 180f64,
tx_bytes: 254.0,
});

let mut individual_cpu_idle_times = HashMap::new();
individual_cpu_idle_times.insert("cpu0".to_string(), 10.0);
individual_cpu_idle_times.insert("cpu1".to_string(), 20.0);
individual_cpu_idle_times.insert("cpu0".to_string(), 10f64);
individual_cpu_idle_times.insert("cpu1".to_string(), 20f64);
let cpu_offset = Some(CPUData {
total_user_time_ms: 100.0,
total_user_time_ms: 100f64,
total_system_time_ms: 53.0,
total_idle_time_ms: 20.0,
total_idle_time_ms: 20f64,
individual_cpu_idle_times,
});

let uptime_offset = Some(50.0);
let uptime_offset = Some(50f64);
let (tmp_chan_tx, _) = watch::channel(());
let (process_chan_tx, _) = watch::channel(());

Expand Down
17 changes: 12 additions & 5 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::lifecycle::invocation::{
lambda_function_url_event::LambdaFunctionUrlEvent,
s3_event::S3Record,
sns_event::{SnsEntity, SnsRecord},
sqs_event::SqsRecord,
sqs_event::{extract_trace_context_from_aws_trace_header, SqsRecord},
step_function_event::StepFunctionEvent,
Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
},
Expand All @@ -35,7 +35,7 @@ pub struct SpanInferrer {
is_async_span: bool,
// Carrier to extract the span context from
carrier: Option<HashMap<String, String>>,
// Generated Span Context from Step Functions
// Generated Span Context from Step Functions or context taken from `AWSTraceHeader` when java->sqs->java
generated_span_context: Option<SpanContext>,
// Tags generated from the trigger
trigger_tags: Option<HashMap<String, String>>,
Expand Down Expand Up @@ -74,6 +74,8 @@ impl SpanInferrer {
..Default::default()
};

let mut is_step_function = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just downcast into the StepFunctions event instead of having a boolean? That way at the very bottom you can just check if the trigger is StepFunction

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having a boolean, where you have this thing, just try downcasting the boxed trigger into a stepfunctions event, if it succeeds, then its guaranteed to be a stepfunction event

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give me an example? I don't think it's doable or considered a good approach in rust


if APIGatewayHttpEvent::is_match(payload_value) {
if let Some(t) = APIGatewayHttpEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span, &self.service_mapping);
Expand All @@ -96,6 +98,10 @@ impl SpanInferrer {
if let Some(t) = SqsRecord::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span, &self.service_mapping);

self.generated_span_context = extract_trace_context_from_aws_trace_header(
t.attributes.aws_trace_header.clone(),
);

// Check for SNS event wrapped in the SQS body
if let Ok(sns_entity) = serde_json::from_str::<SnsEntity>(&t.body) {
debug!("Found an SNS event wrapped in the SQS body");
Expand Down Expand Up @@ -191,6 +197,7 @@ impl SpanInferrer {
if let Some(t) = StepFunctionEvent::new(payload_value.clone()) {
self.generated_span_context = Some(t.get_span_context());
trigger = Some(Box::new(t));
is_step_function = true;
}
} else {
debug!("Unable to infer span from payload: no matching trigger found");
Expand All @@ -209,7 +216,7 @@ impl SpanInferrer {
self.is_async_span = t.is_async();

// For Step Functions, there is no inferred span
if self.generated_span_context.is_some() {
if is_step_function && self.generated_span_context.is_some() {
self.inferred_span = None;
} else {
self.inferred_span = Some(inferred_span);
Expand Down Expand Up @@ -295,8 +302,8 @@ impl SpanInferrer {
///
pub fn get_span_context(&self, propagator: &impl Propagator) -> Option<SpanContext> {
// Step Functions `SpanContext` is deterministically generated
if let Some(sc) = &self.generated_span_context {
return Some(sc.clone());
if self.generated_span_context.is_some() {
return self.generated_span_context.clone();
}

if let Some(sc) = self.carrier.as_ref().and_then(|c| propagator.extract(c)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod tests {
]),
request_context: RequestContext {
request_id: String::from("ec4d58f8-2b8b-4ceb-a1d5-2be7bff58505"),
time_epoch: 1637169449721,
time_epoch: 1_637_169_449_721,
http: Http {
method: String::from("GET"),
path: String::from("/"),
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lifecycle/invocation/triggers/s3_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl ServiceNameResolver for S3Record {
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::lifecycle::invocation::triggers::test_utils::read_json_file;
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lifecycle/invocation/triggers/sns_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl ServiceNameResolver for SnsRecord {
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use datadog_trace_protobuf::pb::Span;

Expand Down
105 changes: 101 additions & 4 deletions bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::lifecycle::invocation::{
ServiceNameResolver, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG,
},
};
use crate::traces::context::{Sampling, SpanContext};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct SqsEvent {
Expand Down Expand Up @@ -64,6 +65,8 @@ pub struct Attributes {
pub sent_timestamp: String,
#[serde(rename = "SenderId")]
pub sender_id: String,
#[serde(rename = "AWSTraceHeader")]
pub aws_trace_header: Option<String>,
}

impl Trigger for SqsRecord {
Expand Down Expand Up @@ -163,12 +166,9 @@ impl Trigger for SqsRecord {
}
}

fn is_async(&self) -> bool {
true
}

fn get_carrier(&self) -> HashMap<String, String> {
let carrier = HashMap::new();

if let Some(ma) = self.message_attributes.get(DATADOG_CARRIER_KEY) {
if let Some(string_value) = &ma.string_value {
return serde_json::from_str(string_value).unwrap_or_default();
Expand All @@ -190,6 +190,10 @@ impl Trigger for SqsRecord {
// TODO: AWSTraceHeader
carrier
}

fn is_async(&self) -> bool {
true
}
}

impl ServiceNameResolver for SqsRecord {
Expand All @@ -206,7 +210,74 @@ impl ServiceNameResolver for SqsRecord {
}
}

// extractTraceContextfromAWSTraceHeader extracts trace context from the
// AWSTraceHeader directly. Unlike the other carriers in this file, it should
// not be passed to the tracer.Propagator, instead extracting context directly.
pub(crate) fn extract_trace_context_from_aws_trace_header(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a crate method and not just a method for the struct? We are not using it anywhere else

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the flexibility and not adding extra restrictions that aren't needed. It makes testing and changing code simpler.
I also think a lot of the structs and traits should go away, so would rather avoid adding code and complexity to keep doing that

headers_string: Option<String>,
) -> Option<SpanContext> {
let value = headers_string?;
if !value.starts_with("Root=") {
return None;
}

let mut start_part = 0;
let mut trace_id = String::new();
let mut parent_id = String::new();
let mut sampled = String::new();

let length = value.len();
while start_part < length {
let end_part = value[start_part..]
.find(';')
.map_or(length, |i| i + start_part);
let part = &value[start_part..end_part];

if part.starts_with("Root=") {
if trace_id.is_empty() {
trace_id = part[24..].to_string();
}
} else if let Some(parent_part) = part.strip_prefix("Parent=") {
if parent_id.is_empty() {
parent_id = parent_part.to_string();
}
} else if part.starts_with("Sampled=") && sampled.is_empty() {
sampled = part[8..].to_string();
}

if !trace_id.is_empty() && !parent_id.is_empty() && !sampled.is_empty() {
break;
}
start_part = end_part + 1;
}

let trace_id = u64::from_str_radix(&trace_id, 16).ok()?;
let parent_id = u64::from_str_radix(&parent_id, 16).ok()?;

if trace_id == 0 || parent_id == 0 {
debug!("awstrace_header contains empty trace or parent ID");
return None;
}

let sampling_priority = i8::from(sampled == "1");

Some(SpanContext {
// the context from AWS Header is used by Datadog only and does not contain the upper
// 64 bits like other 128 w3c compliant trace ids
trace_id,
span_id: parent_id,
sampling: Some(Sampling {
priority: Some(sampling_priority),
mechanism: None,
}),
origin: None,
tags: HashMap::new(),
Comment thread
alexgallotta marked this conversation as resolved.
links: Vec::new(),
})
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::lifecycle::invocation::triggers::test_utils::read_json_file;
Expand Down Expand Up @@ -235,6 +306,7 @@ mod tests {
approximate_receive_count: "1".to_string(),
sent_timestamp: "1523232000000".to_string(),
sender_id: "123456789012".to_string(),
aws_trace_header: None,
},
message_attributes,
md5_of_body: "{{{md5_of_body}}}".to_string(),
Expand Down Expand Up @@ -425,4 +497,29 @@ mod tests {
"generic-service"
);
}

#[test]
fn extract_java_sqs_header_context() {
let json = read_json_file("eventbridge_sqs_java_header_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = SqsRecord::new(payload).expect("Failed to deserialize EventBridgeEvent");

assert_eq!(
extract_trace_context_from_aws_trace_header(Some(
event.attributes.aws_trace_header.unwrap().to_string()
))
.unwrap(),
SpanContext {
trace_id: 130_944_522_478_755_159,
span_id: 9_032_698_535_745_367_362,
sampling: Some(Sampling {
priority: Some("0".parse().unwrap()),
mechanism: None,
}),
origin: None,
tags: HashMap::new(),
links: Vec::new(),
}
);
}
}
Loading