From a386c6a440b7ae5f8d46a2332e597c87d204d4d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:05:02 -0500 Subject: [PATCH 1/4] add `S_TO_NS` --- bottlecap/src/lifecycle/invocation/processor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index e37789202..471c00f69 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -25,6 +25,7 @@ use crate::{ }; pub const MS_TO_NS: f64 = 1_000_000.0; +pub const S_TO_NS: f64 = 1_000_000_000.0; pub struct Processor { pub context_buffer: ContextBuffer, From de554367081bac9eb5ae7ad903ce2627f6f9ade8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:05:29 -0500 Subject: [PATCH 2/4] add `DynamoDbEvent` --- .../invocation/triggers/dynamodb_event.rs | 240 ++++++++++++++++++ .../src/lifecycle/invocation/triggers/mod.rs | 1 + bottlecap/tests/payloads/dynamodb_event.json | 93 +++++++ 3 files changed, 334 insertions(+) create mode 100644 bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs create mode 100644 bottlecap/tests/payloads/dynamodb_event.json diff --git a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs new file mode 100644 index 000000000..f463ba6a9 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs @@ -0,0 +1,240 @@ +use datadog_trace_protobuf::pb::Span; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use tracing::debug; + +use crate::lifecycle::invocation::{ + processor::S_TO_NS, + triggers::{Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, +}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct DynamoDbEvent { + #[serde(rename = "Records")] + pub records: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct DynamoDbRecord { + #[serde(rename = "dynamodb")] + pub dynamodb: DynamoDbEntity, + #[serde(rename = "eventID")] + pub event_id: String, + #[serde(rename = "eventName")] + pub event_name: String, + #[serde(rename = "eventVersion")] + pub event_version: String, + #[serde(rename = "eventSourceARN")] + pub event_source_arn: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct DynamoDbEntity { + #[serde(rename = "ApproximateCreationDateTime")] + pub approximate_creation_date_time: i64, + #[serde(rename = "SizeBytes")] + pub size_bytes: i64, + #[serde(rename = "StreamViewType")] + pub stream_view_type: String, +} + +impl Trigger for DynamoDbRecord { + fn new(payload: Value) -> Option + where + Self: Sized, + { + let records = payload.get("Records").and_then(Value::as_array); + match records { + Some(records) => match serde_json::from_value::(records[0].clone()) { + Ok(event) => Some(event), + Err(e) => { + debug!("Failed to deserialize DynamoDB Record: {e}"); + None + } + }, + None => None, + } + } + + fn is_match(payload: &Value) -> bool + where + Self: Sized, + { + if let Some(first_record) = payload + .get("Records") + .and_then(Value::as_array) + .and_then(|r| r.first()) + .take() + { + first_record.get("dynamodb").is_some() + } else { + false + } + } + + #[allow(clippy::cast_possible_truncation)] + fn enrich_span(&self, span: &mut Span) { + debug!("Enriching an Inferred Span for a DynamoDB event"); + let table_name = self.event_source_arn.split('/').nth(1).unwrap_or_default(); + let resource = format!("{} {}", self.event_name.clone(), table_name); + + let start_time = (self.dynamodb.approximate_creation_date_time as f64 * S_TO_NS) as i64; + // todo: service mapping and peer service + let service_name = "dynamodb"; + + span.name = String::from("aws.dynamodb"); + span.service = service_name.to_string(); + span.resource = resource; + span.r#type = String::from("web"); + span.start = start_time; + span.meta.extend(HashMap::from([ + ("operation_name".to_string(), String::from("aws.dynamodb")), + ("event_id".to_string(), self.event_id.clone()), + ("event_name".to_string(), self.event_name.clone()), + ("event_version".to_string(), self.event_version.clone()), + ( + "event_source_arn".to_string(), + self.event_source_arn.clone(), + ), + ( + "size_bytes".to_string(), + self.dynamodb.size_bytes.to_string(), + ), + ( + "stream_view_type".to_string(), + self.dynamodb.stream_view_type.clone(), + ), + ("table_name".to_string(), table_name.to_string()), + ])); + } + + fn get_tags(&self) -> HashMap { + HashMap::from([( + FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(), + "dynamodb".to_string(), + )]) + } + + fn get_arn(&self, _region: &str) -> String { + self.event_source_arn.clone() + } + + fn get_carrier(&self) -> HashMap { + HashMap::new() + } + + fn is_async(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::lifecycle::invocation::triggers::test_utils::read_json_file; + + #[test] + fn test_new() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = DynamoDbRecord::new(payload).expect("Failed to deserialize into Record"); + + let expected = DynamoDbRecord { + dynamodb: DynamoDbEntity { + approximate_creation_date_time: 1428537600, + size_bytes: 26, + stream_view_type: String::from("NEW_AND_OLD_IMAGES"), + }, + event_id: String::from("c4ca4238a0b923820dcc509a6f75849b"), + event_name: String::from("INSERT"), + event_version: String::from("1.1"), + event_source_arn: String::from("arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899"), + }; + + assert_eq!(result, expected); + } + + #[test] + fn test_is_match() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize DynamoDbRecord"); + + assert!(DynamoDbRecord::is_match(&payload)); + } + + #[test] + fn test_is_not_match() { + let json = read_json_file("sqs_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize SqsRecord"); + assert!(!DynamoDbRecord::is_match(&payload)); + } + + #[test] + fn test_enrich_span() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + let mut span = Span::default(); + event.enrich_span(&mut span); + assert_eq!(span.name, "aws.dynamodb"); + assert_eq!(span.service, "dynamodb"); + assert_eq!(span.resource, "INSERT ExampleTableWithStream"); + assert_eq!(span.r#type, "web"); + + assert_eq!( + span.meta, + HashMap::from([ + ("operation_name".to_string(), "aws.dynamodb".to_string()), + ("event_id".to_string(), "c4ca4238a0b923820dcc509a6f75849b".to_string()), + ("event_name".to_string(), "INSERT".to_string()), + ("event_version".to_string(), "1.1".to_string()), + ( + "event_source_arn".to_string(), + "arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899".to_string() + ), + ("size_bytes".to_string(), "26".to_string()), + ("stream_view_type".to_string(), "NEW_AND_OLD_IMAGES".to_string()), + ("table_name".to_string(), "ExampleTableWithStream".to_string()), + ]) + ); + } + + #[test] + fn test_get_tags() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + let tags = event.get_tags(); + + let expected = HashMap::from([( + "function_trigger.event_source".to_string(), + "dynamodb".to_string(), + )]); + + assert_eq!(tags, expected); + } + + #[test] + fn test_get_arn() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + assert_eq!( + event.get_arn("us-east-1"), + "arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + ); + } + + #[test] + fn test_get_carrier() { + let json = read_json_file("dynamodb_event.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = DynamoDbRecord::new(payload).expect("Failed to deserialize DynamoDbRecord"); + let carrier = event.get_carrier(); + + let expected = HashMap::new(); + + assert_eq!(carrier, expected); + } +} diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index 6edbe5c07..1385f3a34 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -7,6 +7,7 @@ use serde_json::Value; pub mod api_gateway_http_event; pub mod api_gateway_rest_event; +pub mod dynamodb_event; pub mod sns_event; pub mod sqs_event; diff --git a/bottlecap/tests/payloads/dynamodb_event.json b/bottlecap/tests/payloads/dynamodb_event.json new file mode 100644 index 000000000..df0cf7ea4 --- /dev/null +++ b/bottlecap/tests/payloads/dynamodb_event.json @@ -0,0 +1,93 @@ +{ + "Records": [ + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439091", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + } + ] +} From a6df09e1982b39be1ebb87d164f9d247532dbbc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:05:44 -0500 Subject: [PATCH 3/4] use `DynamoDbEvent` in `SpanInferrer` --- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 6c84cbe70..6a8c919fe 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -10,6 +10,7 @@ use crate::config::AwsConfig; use crate::lifecycle::invocation::triggers::{ api_gateway_http_event::APIGatewayHttpEvent, api_gateway_rest_event::APIGatewayRestEvent, + dynamodb_event::DynamoDbRecord, sns_event::{SnsEntity, SnsRecord}, sqs_event::SqsRecord, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG, @@ -102,6 +103,12 @@ impl SpanInferrer { if let Some(t) = SnsRecord::new(payload_value.clone()) { t.enrich_span(&mut inferred_span); + trigger = Some(Box::new(t)); + } + } else if DynamoDbRecord::is_match(payload_value) { + if let Some(t) = DynamoDbRecord::new(payload_value.clone()) { + t.enrich_span(&mut inferred_span); + trigger = Some(Box::new(t)); } } else { From 47aee0caf7b7222ce1c98a0cd7ef25ec30b50ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 8 Nov 2024 21:14:20 -0500 Subject: [PATCH 4/4] update to parse `approximate_creation_date_time` as `f64` --- .../src/lifecycle/invocation/triggers/dynamodb_event.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs index f463ba6a9..46c20fe27 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs @@ -32,7 +32,7 @@ pub struct DynamoDbRecord { #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct DynamoDbEntity { #[serde(rename = "ApproximateCreationDateTime")] - pub approximate_creation_date_time: i64, + pub approximate_creation_date_time: f64, #[serde(rename = "SizeBytes")] pub size_bytes: i64, #[serde(rename = "StreamViewType")] @@ -79,7 +79,7 @@ impl Trigger for DynamoDbRecord { let table_name = self.event_source_arn.split('/').nth(1).unwrap_or_default(); let resource = format!("{} {}", self.event_name.clone(), table_name); - let start_time = (self.dynamodb.approximate_creation_date_time as f64 * S_TO_NS) as i64; + let start_time = (self.dynamodb.approximate_creation_date_time * S_TO_NS) as i64; // todo: service mapping and peer service let service_name = "dynamodb"; @@ -142,7 +142,7 @@ mod tests { let expected = DynamoDbRecord { dynamodb: DynamoDbEntity { - approximate_creation_date_time: 1428537600, + approximate_creation_date_time: 1428537600.0, size_bytes: 26, stream_view_type: String::from("NEW_AND_OLD_IMAGES"), },