diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index edc3253c1..636f9736e 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -12,6 +12,7 @@ use crate::lifecycle::invocation::triggers::{ api_gateway_rest_event::APIGatewayRestEvent, dynamodb_event::DynamoDbRecord, event_bridge_event::EventBridgeEvent, + kinesis_event::KinesisRecord, sns_event::{SnsEntity, SnsRecord}, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, @@ -124,6 +125,12 @@ impl SpanInferrer { if let Some(t) = EventBridgeEvent::new(payload_value.clone()) { t.enrich_span(&mut inferred_span); + trigger = Some(Box::new(t)); + } + } else if KinesisRecord::is_match(payload_value) { + if let Some(t) = KinesisRecord::new(payload_value.clone()) { + t.enrich_span(&mut inferred_span); + trigger = Some(Box::new(t)); } } else { diff --git a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs index 46c20fe27..026e74832 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs @@ -142,7 +142,7 @@ mod tests { let expected = DynamoDbRecord { dynamodb: DynamoDbEntity { - approximate_creation_date_time: 1428537600.0, + approximate_creation_date_time: 1_428_537_600.0, size_bytes: 26, stream_view_type: String::from("NEW_AND_OLD_IMAGES"), }, diff --git a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs index f51b111c6..ff7d174c6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs @@ -200,7 +200,7 @@ mod tests { assert_eq!(span.resource, "testBus"); // Seconds resolution - assert_eq!(span.start, 1731140535000000000); + assert_eq!(span.start, 1_731_140_535_000_000_000); } #[test] diff --git a/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs new file mode 100644 index 000000000..c735d5439 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs @@ -0,0 +1,248 @@ +#![allow(clippy::module_name_repetitions)] +use base64::engine::general_purpose; +use base64::Engine; +use datadog_trace_protobuf::pb::Span; +use serde::{Deserialize, Serialize}; +use serde_json::{from_slice, Value}; +use std::collections::HashMap; +use tracing::debug; + +use crate::lifecycle::invocation::{ + processor::S_TO_NS, + triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, +}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct KinesisEvent { + #[serde(rename = "Records")] + pub records: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct KinesisRecord { + #[serde(rename = "eventID")] + pub event_id: String, + #[serde(rename = "eventName")] + pub event_name: String, + #[serde(rename = "eventSourceARN")] + pub event_source_arn: String, + #[serde(rename = "eventVersion")] + pub event_version: String, + pub kinesis: KinesisEntity, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct KinesisEntity { + #[serde(rename = "approximateArrivalTimestamp")] + pub approximate_arrival_timestamp: f64, + #[serde(rename = "partitionKey")] + pub partition_key: String, + pub data: String, +} + +impl Trigger for KinesisRecord { + fn new(payload: Value) -> Option { + let records = payload.get("Records").and_then(Value::as_array); + match records { + Some(records) => match serde_json::from_value::(records[0].clone()) { + Ok(event) => Some(event), + Err(e) => { + debug!("Failed to deserialize Kinesis Record: {e}"); + None + } + }, + None => None, + } + } + + fn is_match(payload: &Value) -> bool { + if let Some(first_record) = payload + .get("Records") + .and_then(Value::as_array) + .and_then(|r| r.first()) + .take() + { + first_record.get("kinesis").is_some() + } else { + false + } + } + + #[allow(clippy::cast_possible_truncation)] + fn enrich_span(&self, span: &mut Span) { + let event_source_arn = &self.event_source_arn; + let parsed_stream_name = event_source_arn.split('/').last().unwrap_or_default(); + let parsed_shard_id = self.event_id.split(':').next().unwrap_or_default(); + span.name = "aws.kinesis".to_string(); + span.service = "kinesis".to_string(); + span.start = (self.kinesis.approximate_arrival_timestamp * S_TO_NS) as i64; + span.resource = parsed_stream_name.to_string(); + span.r#type = "web".to_string(); + span.meta = HashMap::from([ + ("operation_name".to_string(), "aws.kinesis".to_string()), + ("stream_name".to_string(), parsed_stream_name.to_string()), + ("shard_id".to_string(), parsed_shard_id.to_string()), + ("event_source_arn".to_string(), event_source_arn.to_string()), + ("event_id".to_string(), self.event_id.to_string()), + ("event_name".to_string(), self.event_name.to_string()), + ("event_version".to_string(), self.event_version.to_string()), + ( + "partition_key".to_string(), + self.kinesis.partition_key.to_string(), + ), + ]); + } + + fn get_tags(&self) -> HashMap { + HashMap::from([( + FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), + "kinesis".to_string(), + )]) + } + + fn get_arn(&self, _region: &str) -> String { + self.event_source_arn.clone() + } + + fn get_carrier(&self) -> HashMap { + if let Ok(decoded_base64) = general_purpose::STANDARD.decode(&self.kinesis.data) { + if let Ok(as_json_map) = from_slice::>(&decoded_base64) { + if let Some(carrier) = as_json_map.get(DATADOG_CARRIER_KEY) { + return serde_json::from_value(carrier.clone()).unwrap_or_default(); + } + } + }; + HashMap::new() + } + + fn is_async(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::lifecycle::invocation::triggers::test_utils::read_json_file; + + #[test] + fn test_new() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = KinesisRecord::new(payload).expect("Failed to deserialize into Record"); + + let expected = KinesisRecord { + event_id: + "shardId-000000000002:49624230154685806402418173680709770494154422022871973922" + .to_string(), + event_name: "aws:kinesis:record".to_string(), + event_source_arn: "arn:aws:kinesis:sa-east-1:425362996713:stream/kinesisStream" + .to_string(), + event_version: "1.0".to_string(), + kinesis: KinesisEntity { + approximate_arrival_timestamp: 1_643_638_425.163, + partition_key: "partitionkey".to_string(), + data: "eyJmb28iOiAiYmFyIiwgIl9kYXRhZG9nIjogeyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiAiNDk0ODM3NzMxNjM1NzI5MTQyMSIsICJ4LWRhdGFkb2ctcGFyZW50LWlkIjogIjI4NzYyNTMzODAwMTg2ODEwMjYiLCAieC1kYXRhZG9nLXNhbXBsaW5nLXByaW9yaXR5IjogIjEifX0=".to_string(), + }, + }; + + assert_eq!(result, expected); + } + + #[test] + fn test_is_match() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize S3Record"); + + assert!(KinesisRecord::is_match(&payload)); + } + + #[test] + fn test_is_not_match() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize SqsRecord"); + assert!(!KinesisRecord::is_match(&payload)); + } + + #[test] + fn test_enrich_span() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize S3Record"); + let mut span = Span::default(); + event.enrich_span(&mut span); + assert_eq!(span.name, "aws.kinesis"); + assert_eq!(span.service, "kinesis"); + assert_eq!(span.resource, "kinesisStream"); + assert_eq!(span.r#type, "web"); + + assert_eq!( + span.meta, + HashMap::from([ + ("operation_name".to_string(), "aws.kinesis".to_string()), + ("stream_name".to_string(), "kinesisStream".to_string()), + ("shard_id".to_string(), "shardId-000000000002".to_string()), + ( + "event_source_arn".to_string(), + "arn:aws:kinesis:sa-east-1:425362996713:stream/kinesisStream".to_string() + ), + ( + "event_id".to_string(), + "shardId-000000000002:49624230154685806402418173680709770494154422022871973922" + .to_string() + ), + ("event_name".to_string(), "aws:kinesis:record".to_string()), + ("event_version".to_string(), "1.0".to_string()), + ("partition_key".to_string(), "partitionkey".to_string()), + ]) + ); + } + + #[test] + fn test_get_tags() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize KinesisRecord"); + let tags = event.get_tags(); + + let expected = HashMap::from([( + "function_trigger.event_source".to_string(), + "kinesis".to_string(), + )]); + + assert_eq!(tags, expected); + } + + #[test] + fn test_get_arn() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize KinesisRecord"); + assert_eq!( + event.get_arn("us-east-1"), + "arn:aws:kinesis:sa-east-1:425362996713:stream/kinesisStream".to_string() + ); + } + + #[test] + fn test_get_carrier() { + let json = read_json_file("kinesis_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = KinesisRecord::new(payload).expect("Failed to deserialize KinesisRecord"); + let carrier = event.get_carrier(); + + let expected = HashMap::from([ + ( + "x-datadog-trace-id".to_string(), + "4948377316357291421".to_string(), + ), + ( + "x-datadog-parent-id".to_string(), + "2876253380018681026".to_string(), + ), + ("x-datadog-sampling-priority".to_string(), "1".to_string()), + ]); + + assert_eq!(carrier, expected); + } +} diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index e0d347f08..5d9ba07e0 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -9,6 +9,7 @@ pub mod api_gateway_http_event; pub mod api_gateway_rest_event; pub mod dynamodb_event; pub mod event_bridge_event; +pub mod kinesis_event; pub mod s3_event; pub mod sns_event; pub mod sqs_event; diff --git a/bottlecap/tests/payloads/kinesis_event.json b/bottlecap/tests/payloads/kinesis_event.json new file mode 100644 index 000000000..822530822 --- /dev/null +++ b/bottlecap/tests/payloads/kinesis_event.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "partitionkey", + "sequenceNumber": "49624230154685806402418173680709770494154422022871973922", + "data": "eyJmb28iOiAiYmFyIiwgIl9kYXRhZG9nIjogeyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiAiNDk0ODM3NzMxNjM1NzI5MTQyMSIsICJ4LWRhdGFkb2ctcGFyZW50LWlkIjogIjI4NzYyNTMzODAwMTg2ODEwMjYiLCAieC1kYXRhZG9nLXNhbXBsaW5nLXByaW9yaXR5IjogIjEifX0=", + "approximateArrivalTimestamp": 1643638425.163 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000002:49624230154685806402418173680709770494154422022871973922", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::425362996713:role/inferred-spans-python-dev-sa-east-1-lambdaRole", + "awsRegion": "sa-east-1", + "eventSourceARN": "arn:aws:kinesis:sa-east-1:425362996713:stream/kinesisStream" + } + ] +}