From 3fdd523de88d0d26c19b5af5938770fdca025971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:06:47 -0500 Subject: [PATCH 01/12] add step functions events payloads --- .../tests/payloads/step_function_event.json | 19 +++++++++++++++++ .../payloads/step_function_legacy_event.json | 21 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 bottlecap/tests/payloads/step_function_event.json create mode 100644 bottlecap/tests/payloads/step_function_legacy_event.json diff --git a/bottlecap/tests/payloads/step_function_event.json b/bottlecap/tests/payloads/step_function_event.json new file mode 100644 index 000000000..1461c7164 --- /dev/null +++ b/bottlecap/tests/payloads/step_function_event.json @@ -0,0 +1,19 @@ +{ + "Execution": { + "Id": "arn:aws:states:us-east-1:425362996713:execution:agocsTestSF:bc9f281c-3daa-4e5a-9a60-471a3810bf44", + "Input": {}, + "StartTime": "2024-07-30T19:55:52.976Z", + "Name": "bc9f281c-3daa-4e5a-9a60-471a3810bf44", + "RoleArn": "arn:aws:iam::425362996713:role/test-serverless-stepfunctions-dev-AgocsTestSFRole-tRkeFXScjyk4", + "RedriveCount": 0 + }, + "StateMachine": { + "Id": "arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF", + "Name": "agocsTestSF" + }, + "State": { + "Name": "agocsTest1", + "EnteredTime": "2024-07-30T19:55:53.018Z", + "RetryCount": 0 + } +} diff --git a/bottlecap/tests/payloads/step_function_legacy_event.json b/bottlecap/tests/payloads/step_function_legacy_event.json new file mode 100644 index 000000000..74e4c010a --- /dev/null +++ b/bottlecap/tests/payloads/step_function_legacy_event.json @@ -0,0 +1,21 @@ +{ + "Payload": { + "Execution": { + "Id": "arn:aws:states:us-east-1:425362996713:execution:agocsTestSF:bc9f281c-3daa-4e5a-9a60-471a3810bf44", + "Input": {}, + "StartTime": "2024-07-30T19:55:52.976Z", + "Name": "bc9f281c-3daa-4e5a-9a60-471a3810bf44", + "RoleArn": "arn:aws:iam::425362996713:role/test-serverless-stepfunctions-dev-AgocsTestSFRole-tRkeFXScjyk4", + "RedriveCount": 0 + }, + "StateMachine": { + "Id": "arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF", + "Name": "agocsTestSF" + }, + "State": { + "Name": "agocsTest1", + "EnteredTime": "2024-07-30T19:55:53.018Z", + "RetryCount": 0 + } + } +} From 145e03cacd855fd23727d68430b55bed8b7d7ea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:06:59 -0500 Subject: [PATCH 02/12] make some methods public --- bottlecap/src/traces/propagation/text_map_propagator.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/traces/propagation/text_map_propagator.rs b/bottlecap/src/traces/propagation/text_map_propagator.rs index 34b482643..f1c5cbcf4 100644 --- a/bottlecap/src/traces/propagation/text_map_propagator.rs +++ b/bottlecap/src/traces/propagation/text_map_propagator.rs @@ -13,7 +13,8 @@ use crate::traces::propagation::{ // Datadog Keys pub const DATADOG_TRACE_ID_KEY: &str = "x-datadog-trace-id"; -const DATADOG_PARENT_ID_KEY: &str = "x-datadog-parent-id"; +pub const DATADOG_PARENT_ID_KEY: &str = "x-datadog-parent-id"; +pub const DATADOG_SPAN_ID_KEY: &str = "x-datadog-span-id"; pub const DATADOG_SAMPLING_PRIORITY_KEY: &str = "x-datadog-sampling-priority"; const DATADOG_ORIGIN_KEY: &str = "x-datadog-origin"; pub const DATADOG_TAGS_KEY: &str = "x-datadog-tags"; @@ -148,7 +149,7 @@ impl DatadogHeaderPropagator { Some(origin.to_string()) } - fn extract_tags(carrier: &dyn Extractor) -> HashMap { + pub fn extract_tags(carrier: &dyn Extractor) -> HashMap { let carrier_tags = carrier.get(DATADOG_TAGS_KEY).unwrap_or_default(); let mut tags: HashMap = HashMap::new(); From 481f7a7de3bcb7a3695f2e1a006a7865dd3a41f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:45:00 -0500 Subject: [PATCH 03/12] add `StepFunctionEvent` --- .../triggers/step_function_event.rs | 369 ++++++++++++++++++ 1 file changed, 369 insertions(+) create mode 100644 bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs diff --git a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs new file mode 100644 index 000000000..cac2d464f --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs @@ -0,0 +1,369 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sha2::{Digest, Sha256}; + +use crate::{ + lifecycle::invocation::triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + traces::{ + context::{Sampling, SpanContext}, + propagation::text_map_propagator::DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, + }, +}; + +#[allow(clippy::module_name_repetitions)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct LegacyStepFunctionEvent { + #[serde(rename = "Payload")] + pub payload: StepFunctionEvent, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct StepFunctionEvent { + #[serde(rename = "Execution")] + pub execution: Execution, + #[serde(rename = "State")] + pub state: State, + #[serde(rename = "StateMachine")] + pub state_machine: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct Execution { + #[serde(rename = "Id")] + id: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct State { + #[serde(rename = "Name")] + name: String, + #[serde(rename = "EnteredTime")] + entered_time: DateTime, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct StateMachine { + #[serde(rename = "Id")] + id: String, +} + +impl Trigger for StepFunctionEvent { + fn new(payload: serde_json::Value) -> Option + where + Self: Sized, + { + let p = payload.get("Payload").unwrap_or(&payload); + match serde_json::from_value::(p.clone()) { + Ok(event) => Some(event), + Err(e) => { + tracing::debug!("Failed to deserialize Step Function Event: {e}"); + None + } + } + } + + fn is_match(payload: &serde_json::Value) -> bool + where + Self: Sized, + { + // Check first if the payload is a Legacy Step Function event + let p = payload.get("Payload").unwrap_or(payload); + + let execution_id = p + .get("Execution") + .and_then(Value::as_object) + .and_then(|e| e.get("Id")); + let state = p.get("State").and_then(Value::as_object); + let name = state.and_then(|s| s.get("Name")); + let entered_time = state.and_then(|s| s.get("EnteredTime")); + + execution_id.is_some() && name.is_some() && entered_time.is_some() + } + + fn enrich_span(&self, _span: &mut datadog_trace_protobuf::pb::Span) {} + + fn get_tags(&self) -> HashMap { + HashMap::from([( + FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), + "states".to_string(), + )]) + } + + fn get_arn(&self, _region: &str) -> String { + if let Some(sm) = &self.state_machine { + return sm.id.clone(); + } + + String::new() + } + + fn get_carrier(&self) -> HashMap { + HashMap::new() + } + + fn is_async(&self) -> bool { + true + } +} + +impl StepFunctionEvent { + #[must_use] + pub fn get_span_context(&self) -> SpanContext { + let (lo_tid, hi_tid) = Self::generate_trace_id(self.execution.id.clone()); + let tags = HashMap::from([( + DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY.to_string(), + format!("{hi_tid:x}"), + )]); + + let parent_id = Self::generate_parent_id( + self.execution.id.clone(), + self.state.name.clone(), + self.state.entered_time.to_rfc3339(), + ); + + SpanContext { + trace_id: lo_tid, + span_id: parent_id, + // Priority Auto Keep + sampling: Some(Sampling { + priority: Some(1), + mechanism: None, + }), + origin: Some("states".to_string()), + tags, + links: vec![], + } + } + + /// Generates a random 64 bit ID from the formatted hash of the + /// Step Function Execution ARN, the State Name, and the State Entered Time + /// + fn generate_parent_id( + execution_id: String, + state_name: String, + state_entered_time: String, + ) -> u64 { + let unique_string = format!("{execution_id}#{state_name}#{state_entered_time}"); + + let hash = Sha256::digest(unique_string.as_bytes()); + Self::get_positive_u64(&hash[0..8]) + } + + /// Generates a random 128 bit ID from the Step Function Execution ARN + /// + fn generate_trace_id(execution_arn: String) -> (u64, u64) { + let hash = Sha256::digest(execution_arn.as_bytes()); + + let lower_order_bits = Self::get_positive_u64(&hash[8..16]); + let higher_order_bits = Self::get_positive_u64(&hash[0..8]); + + (lower_order_bits, higher_order_bits) + } + + /// Converts the first 8 bytes of a byte array to a positive `u64` + /// + fn get_positive_u64(hash_bytes: &[u8]) -> u64 { + let mut result: u64 = hash_bytes + .iter() + .take(8) + .fold(0, |acc, &byte| (acc << 8) + u64::from(byte)); + + // Ensure the highest bit is always 0 + result &= !(1u64 << 63); + + // Return 1 if result is 0 + if result == 0 { + 1 + } else { + result + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::lifecycle::invocation::triggers::test_utils::read_json_file; + + #[test] + fn test_new() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = StepFunctionEvent::new(payload).expect("Failed to deserialize into Event"); + + let expected = StepFunctionEvent { + execution: Execution { + id: String::from("arn:aws:states:us-east-1:425362996713:execution:agocsTestSF:bc9f281c-3daa-4e5a-9a60-471a3810bf44"), + }, + state: State { + name: String::from("agocsTest1"), + entered_time: DateTime::parse_from_rfc3339("2024-07-30T19:55:53.018Z") + .unwrap() + .with_timezone(&Utc),}, + state_machine: Some(StateMachine { + id: String::from("arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF"), + }), + }; + + assert_eq!(result, expected); + } + + #[test] + fn test_new_legacy_event() { + let json = read_json_file("step_function_legacy_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = StepFunctionEvent::new(payload).expect("Failed to deserialize into Event"); + + let expected = StepFunctionEvent { + execution: Execution { + id: String::from("arn:aws:states:us-east-1:425362996713:execution:agocsTestSF:bc9f281c-3daa-4e5a-9a60-471a3810bf44"), + }, + state: State { + name: String::from("agocsTest1"), + entered_time: DateTime::parse_from_rfc3339("2024-07-30T19:55:53.018Z") + .unwrap() + .with_timezone(&Utc),}, + state_machine: Some(StateMachine { + id: String::from("arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF"), + }), + }; + + assert_eq!(result, expected); + } + + #[test] + fn test_is_match() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize StepFunctionEvent"); + + assert!(StepFunctionEvent::is_match(&payload)); + } + + #[test] + fn test_is_match_legacy_event() { + let json = read_json_file("step_function_legacy_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize StepFunctionEvent"); + + assert!(StepFunctionEvent::is_match(&payload)); + } + + #[test] + fn test_is_not_match() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize SqsRecord"); + assert!(!StepFunctionEvent::is_match(&payload)); + } + + #[test] + fn test_get_tags() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + StepFunctionEvent::new(payload).expect("Failed to deserialize StepFunctionEvent"); + let tags = event.get_tags(); + + let expected = HashMap::from([( + "function_trigger.event_source".to_string(), + "states".to_string(), + )]); + + assert_eq!(tags, expected); + } + + #[test] + fn test_get_arn() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + StepFunctionEvent::new(payload).expect("Failed to deserialize StepFunctionEvent"); + assert_eq!( + event.get_arn("us-east-1"), + "arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF" + ); + } + + #[test] + fn test_get_carrier() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + StepFunctionEvent::new(payload).expect("Failed to deserialize StepFunctionEvent"); + let carrier = event.get_carrier(); + + let expected = HashMap::new(); + + assert_eq!(carrier, expected); + } + + #[test] + fn get_span_context() { + let json = read_json_file("step_function_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = + StepFunctionEvent::new(payload).expect("Failed to deserialize StepFunctionEvent"); + + let span_context = event.get_span_context(); + + let expected = SpanContext { + trace_id: 5744042798732701615, + span_id: 3347375825762253425, + sampling: Some(Sampling { + priority: Some(1), + mechanism: None, + }), + origin: Some("states".to_string()), + tags: HashMap::from([( + DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY.to_string(), + "1914fe7789eb32be".to_string(), + )]), + links: vec![], + }; + + assert_eq!(span_context, expected); + } + + #[test] + fn test_generate_parent_id() { + let parent_id = StepFunctionEvent::generate_parent_id( + String::from("arn:aws:states:sa-east-1:601427271234:express:DatadogStateMachine:acaf1a67-336a-e854-1599-2a627eb2dd8a:c8baf081-31f1-464d-971f-70cb17d01111"), + String::from("step-one"), + String::from("2022-12-08T21:08:19.224Z") + ); + + assert_eq!(parent_id, 4340734536022949921); + + let parent_id = StepFunctionEvent::generate_parent_id( + String::from("arn:aws:states:sa-east-1:601427271234:express:DatadogStateMachine:acaf1a67-336a-e854-1599-2a627eb2dd8a:c8baf081-31f1-464d-971f-70cb17d01111"), + String::from("step-one"), + String::from("2022-12-08T21:08:19.224Y") + ); + + assert_eq!(parent_id, 981693280319792699); + } + + #[test] + fn test_generate_trace_id() { + let (lo_tid, hi_tid) = StepFunctionEvent::generate_trace_id(String::from( + "arn:aws:states:sa-east-1:425362996713:stateMachine:MyStateMachine-b276uka1j", + )); + let hex_tid = format!("{:x}", hi_tid); + + assert_eq!(lo_tid, 1680583253837593461); + assert_eq!(hi_tid, 6984552746569958392); + + assert_eq!(hex_tid, "60ee1db79e4803f8"); + + let (lo_tid, hi_tid) = StepFunctionEvent::generate_trace_id( + String::from("arn:aws:states:us-east-1:425362996713:execution:agocsTestSF:bc9f281c-3daa-4e5a-9a60-471a3810bf44") + ); + let hex_tid = format!("{:x}", hi_tid); + + assert_eq!(lo_tid, 5744042798732701615); + assert_eq!(hi_tid, 1807349139850867390); + + assert_eq!(hex_tid, "1914fe7789eb32be"); + } +} From a64d5be290cdc9730b51df4dbe819631de513fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:45:31 -0500 Subject: [PATCH 04/12] adapt `SpanInferrer` for generated `SpanContext` --- .../src/lifecycle/invocation/span_inferrer.rs | 46 ++++++++++++++++--- .../src/lifecycle/invocation/triggers/mod.rs | 1 + 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index edc3253c1..12f5cb3a0 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -12,19 +12,27 @@ use crate::lifecycle::invocation::triggers::{ api_gateway_rest_event::APIGatewayRestEvent, dynamodb_event::DynamoDbRecord, event_bridge_event::EventBridgeEvent, + s3_event::S3Record, sns_event::{SnsEntity, SnsRecord}, sqs_event::SqsRecord, + step_function_event::StepFunctionEvent, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, }; use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; - -use super::triggers::s3_event::S3Record; +use crate::traces::{context::SpanContext, propagation::Propagator}; pub struct SpanInferrer { + // Span inferred from the Lambda incoming request payload pub inferred_span: Option, + // Nested span inferred from the Lambda incoming request payload pub wrapped_inferred_span: Option, + // If the inferred span is async is_async_span: bool, + // Carrier to extract the span context from carrier: Option>, + // Generated Span Context from Step Functions + generated_span_context: Option, + // Tags generated from the trigger trigger_tags: Option>, } @@ -42,6 +50,7 @@ impl SpanInferrer { wrapped_inferred_span: None, is_async_span: false, carrier: None, + generated_span_context: None, trigger_tags: None, } } @@ -55,6 +64,7 @@ impl SpanInferrer { self.wrapped_inferred_span = None; self.is_async_span = false; self.carrier = None; + self.generated_span_context = None; self.trigger_tags = None; let mut trigger: Option> = None; @@ -124,6 +134,12 @@ impl SpanInferrer { if let Some(t) = EventBridgeEvent::new(payload_value.clone()) { t.enrich_span(&mut inferred_span); + trigger = Some(Box::new(t)); + } + } else if StepFunctionEvent::is_match(payload_value) { + if let Some(t) = StepFunctionEvent::new(payload_value.clone()) { + self.generated_span_context = Some(t.get_span_context()); + trigger = Some(Box::new(t)); } } else { @@ -141,7 +157,11 @@ impl SpanInferrer { self.trigger_tags = Some(trigger_tags); self.carrier = Some(t.get_carrier()); self.is_async_span = t.is_async(); - self.inferred_span = Some(inferred_span); + + // For Step Functions, there is no inferred span + if self.generated_span_context.is_none() { + self.inferred_span = Some(inferred_span); + } } } @@ -214,11 +234,23 @@ impl SpanInferrer { rng.gen() } - /// Returns a clone of the carrier associated with the inferred span + /// Returns the extracted span context /// - #[must_use] - pub fn get_carrier(&self) -> Option> { - self.carrier.clone() + /// If the carrier is set, it will try to extract the span context, + /// otherwise it will + /// + pub fn get_span_context(&self, propagator: &impl Propagator) -> Option { + // Step Functions `SpanContext` is deterministically generated + if let Some(sc) = &self.generated_span_context { + return Some(sc.clone()); + } + + if let Some(sc) = self.carrier.as_ref().and_then(|c| propagator.extract(c)) { + debug!("Extracted trace context from inferred span"); + return Some(sc); + } + + None } /// Returns a clone of the tags associated with the inferred span diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index e0d347f08..f06c68722 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -12,6 +12,7 @@ pub mod event_bridge_event; pub mod s3_event; pub mod sns_event; pub mod sqs_event; +pub mod step_function_event; pub const DATADOG_CARRIER_KEY: &str = "_datadog"; pub const FUNCTION_TRIGGER_EVENT_SOURCE_TAG: &str = "function_trigger.event_source"; From 72e9cc3e43d818c130695648ecac43fea10ec2e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:45:55 -0500 Subject: [PATCH 05/12] adapt `InvocationProcessor` for generated `SpanContext` --- .../src/lifecycle/invocation/processor.rs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 467519e37..e6e2cfe0c 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -21,7 +21,13 @@ use crate::{ telemetry::events::{ReportMetrics, Status}, traces::{ context::SpanContext, - propagation::{DatadogCompositePropagator, Propagator}, + propagation::{ + text_map_propagator::{ + DatadogHeaderPropagator, DATADOG_PARENT_ID_KEY, DATADOG_SPAN_ID_KEY, + DATADOG_TRACE_ID_KEY, + }, + DatadogCompositePropagator, Propagator, + }, trace_processor, }, }; @@ -299,11 +305,8 @@ impl Processor { headers: &HashMap, payload_value: &Value, ) -> Option { - if let Some(carrier) = self.inferrer.get_carrier() { - if let Some(sc) = self.propagator.extract(&carrier) { - debug!("Extracted trace context from inferred span"); - return Some(sc); - } + if let Some(sc) = self.inferrer.get_span_context(&self.propagator) { + return Some(sc); } if let Some(payload_headers) = payload_value.get("headers") { @@ -342,30 +345,39 @@ impl Processor { let mut trace_id = 0; let mut span_id = 0; let mut parent_id = 0; + let mut tags: HashMap = HashMap::new(); // If we have a trace context, update the span context if let Some(sc) = &mut self.extracted_span_context { trace_id = sc.trace_id; span_id = sc.span_id; + tags.extend(sc.tags.clone()); } - if let Some(header) = headers.get("x-datadog-trace-id") { + // Extract trace context from headers manually + if let Some(header) = headers.get(DATADOG_TRACE_ID_KEY) { trace_id = header.parse::().unwrap_or(0); } - if let Some(header) = headers.get("x-datadog-span-id") { + if let Some(header) = headers.get(DATADOG_SPAN_ID_KEY) { span_id = header.parse::().unwrap_or(0); } - if let Some(header) = headers.get("x-datadog-parent-id") { + if let Some(header) = headers.get(DATADOG_PARENT_ID_KEY) { parent_id = header.parse::().unwrap_or(0); } + // Extract tags from headers + tags = DatadogHeaderPropagator::extract_tags(&headers); + self.span.trace_id = trace_id; self.span.span_id = span_id; - if self.inferrer.inferred_span.is_none() { + if self.inferrer.inferred_span.is_some() { + self.inferrer.extend_meta(tags); + } else { self.span.parent_id = parent_id; + self.span.meta.extend(tags); } } } From 2a536f6df7f408e170763ecb779152fedc95ad85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:49:15 -0500 Subject: [PATCH 06/12] resolve merge conflicts --- bottlecap/src/lifecycle/invocation/processor.rs | 2 +- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 7b33c0d06..58b45697d 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -394,7 +394,7 @@ impl Processor { } // Extract tags from headers - tags = DatadogHeaderPropagator::extract_tags(&headers); + tags = DatadogHeaderPropagator::extract_tags(headers); self.span.trace_id = trace_id; self.span.span_id = span_id; diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 4f8a29cf9..b417246a9 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -12,8 +12,8 @@ use crate::lifecycle::invocation::triggers::{ api_gateway_rest_event::APIGatewayRestEvent, dynamodb_event::DynamoDbRecord, event_bridge_event::EventBridgeEvent, - s3_event::S3Record, kinesis_event::KinesisRecord, + s3_event::S3Record, sns_event::{SnsEntity, SnsRecord}, sqs_event::SqsRecord, step_function_event::StepFunctionEvent, From 185228e2454b195c3f30dd466f539c4d030b07fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:50:47 -0500 Subject: [PATCH 07/12] resolve clippy issues --- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 1 + bottlecap/src/metrics/enhanced/statfs.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index b417246a9..b25cbced7 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -60,6 +60,7 @@ impl SpanInferrer { /// and try matching it to a `Trigger` implementation, which will create /// an inferred span and set it to `self.inferred_span` /// + #[allow(clippy::too_many_lines)] pub fn infer_span(&mut self, payload_value: &Value, aws_config: &AwsConfig) { self.inferred_span = None; self.wrapped_inferred_span = None; diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs index 0da8a1828..bdfc9e941 100644 --- a/bottlecap/src/metrics/enhanced/statfs.rs +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -10,7 +10,7 @@ use std::path::Path; pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; Ok(( - stat.block_size() as f64, + f64::from(stat.block_size()), stat.blocks() as f64, stat.blocks_available() as f64, )) From 9f42c9efb09e5f795583f1f6f0bccdbfe84e1762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:55:37 -0500 Subject: [PATCH 08/12] add allow clippy --- bottlecap/src/metrics/enhanced/statfs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/metrics/enhanced/statfs.rs b/bottlecap/src/metrics/enhanced/statfs.rs index bdfc9e941..84e7412f1 100644 --- a/bottlecap/src/metrics/enhanced/statfs.rs +++ b/bottlecap/src/metrics/enhanced/statfs.rs @@ -5,12 +5,13 @@ use std::io; use std::path::Path; #[cfg(not(target_os = "windows"))] +#[allow(clippy::cast_lossless)] /// Returns the block size, total number of blocks, and number of blocks available for the specified directory path. /// pub fn statfs_info(path: &str) -> Result<(f64, f64, f64), io::Error> { let stat = statfs(Path::new(path)).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; Ok(( - f64::from(stat.block_size()), + stat.block_size() as f64, stat.blocks() as f64, stat.blocks_available() as f64, )) From a7e2483b8ee58ba439acf70781b3828c9d96af19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:23:35 -0500 Subject: [PATCH 09/12] do not serialize the `entered_time` --- .../invocation/triggers/step_function_event.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs index cac2d464f..2eb571865 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use sha2::{Digest, Sha256}; @@ -41,7 +40,7 @@ pub struct State { #[serde(rename = "Name")] name: String, #[serde(rename = "EnteredTime")] - entered_time: DateTime, + entered_time: String, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -121,7 +120,7 @@ impl StepFunctionEvent { let parent_id = Self::generate_parent_id( self.execution.id.clone(), self.state.name.clone(), - self.state.entered_time.to_rfc3339(), + self.state.entered_time.clone(), ); SpanContext { @@ -200,9 +199,8 @@ mod tests { }, state: State { name: String::from("agocsTest1"), - entered_time: DateTime::parse_from_rfc3339("2024-07-30T19:55:53.018Z") - .unwrap() - .with_timezone(&Utc),}, + entered_time: String::from("2024-07-30T19:55:53.018Z"), + }, state_machine: Some(StateMachine { id: String::from("arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF"), }), @@ -223,9 +221,8 @@ mod tests { }, state: State { name: String::from("agocsTest1"), - entered_time: DateTime::parse_from_rfc3339("2024-07-30T19:55:53.018Z") - .unwrap() - .with_timezone(&Utc),}, + entered_time: String::from("2024-07-30T19:55:53.018Z"), + }, state_machine: Some(StateMachine { id: String::from("arn:aws:states:us-east-1:425362996713:stateMachine:agocsTestSF"), }), From e6ac807316f782c12907de9271b9b3a44180de74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:24:08 -0500 Subject: [PATCH 10/12] set `None` for inferred span when `generated_span_context` exists --- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index b25cbced7..a4a3a5600 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -202,7 +202,9 @@ impl SpanInferrer { self.is_async_span = t.is_async(); // For Step Functions, there is no inferred span - if self.generated_span_context.is_none() { + if self.generated_span_context.is_some() { + self.inferred_span = None; + } else { self.inferred_span = Some(inferred_span); } } From 8b87348850ea17512552168d135f3bde67d8391e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:24:33 -0500 Subject: [PATCH 11/12] tidy code for last trace context update --- .../src/lifecycle/invocation/processor.rs | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 58b45697d..64a8582b1 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -284,6 +284,8 @@ impl Processor { self.span.trace_id = 0; self.span.parent_id = 0; self.span.span_id = 0; + self.span.error = 0; + self.span.meta.clear(); self.extracted_span_context = None; let payload_value = match serde_json::from_slice::(&payload) { @@ -294,6 +296,7 @@ impl Processor { self.inferrer.infer_span(&payload_value, &self.aws_config); self.extracted_span_context = self.extract_span_context(&headers, &payload_value); + // Set the extracted trace context to the spans if let Some(sc) = &self.extracted_span_context { self.span.trace_id = sc.trace_id; self.span.parent_id = sc.span_id; @@ -308,6 +311,8 @@ impl Processor { } } + // If we have an inferred span, set the invocation span parent id + // to be the inferred span id, even if we don't have an extracted trace context if let Some(inferred_span) = &self.inferrer.inferred_span { self.span.parent_id = inferred_span.span_id; } @@ -366,38 +371,45 @@ impl Processor { } fn update_span_context_from_headers(&mut self, headers: &HashMap) { - // todo: fix this, code is a copy of the existing logic in Go, not accounting - // when a 128 bit trace id exist let mut trace_id = 0; - let mut span_id = 0; let mut parent_id = 0; let mut tags: HashMap = HashMap::new(); - // If we have a trace context, update the span context + // If we have a trace context, this means we got it from + // distributed tracing if let Some(sc) = &mut self.extracted_span_context { + debug!("Trace context was found, not extracting it from incoming headers"); trace_id = sc.trace_id; - span_id = sc.span_id; + parent_id = sc.span_id; tags.extend(sc.tags.clone()); } - // Extract trace context from headers manually - if let Some(header) = headers.get(DATADOG_TRACE_ID_KEY) { - trace_id = header.parse::().unwrap_or(0); - } + // We are the root span, so we should extract the trace context + // from the tracer, which has sent it through end invocation headers + if trace_id == 0 { + debug!("No trace context found, extracting it from headers"); + // Extract trace context from headers manually + if let Some(header) = headers.get(DATADOG_TRACE_ID_KEY) { + trace_id = header.parse::().unwrap_or(0); + } - if let Some(header) = headers.get(DATADOG_SPAN_ID_KEY) { - span_id = header.parse::().unwrap_or(0); - } + if let Some(header) = headers.get(DATADOG_PARENT_ID_KEY) { + parent_id = header.parse::().unwrap_or(0); + } + + // TODO: sampling priority extraction - if let Some(header) = headers.get(DATADOG_PARENT_ID_KEY) { - parent_id = header.parse::().unwrap_or(0); + // Extract tags from headers + // Used for 128 bit trace ids + tags = DatadogHeaderPropagator::extract_tags(headers); } - // Extract tags from headers - tags = DatadogHeaderPropagator::extract_tags(headers); + // We should always use the generated trace id from the tracer + if let Some(header) = headers.get(DATADOG_SPAN_ID_KEY) { + self.span.span_id = header.parse::().unwrap_or(0); + } self.span.trace_id = trace_id; - self.span.span_id = span_id; if self.inferrer.inferred_span.is_some() { self.inferrer.extend_meta(tags); From 9a329c9c4dd754d60c00cf8fb1c5da999c7a13e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:29:31 -0500 Subject: [PATCH 12/12] fix unit test --- .../src/lifecycle/invocation/triggers/step_function_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs index 2eb571865..91eb2af54 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/step_function_event.rs @@ -306,7 +306,7 @@ mod tests { let expected = SpanContext { trace_id: 5744042798732701615, - span_id: 3347375825762253425, + span_id: 2902498116043018663, sampling: Some(Sampling { priority: Some(1), mechanism: None,