Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::lifecycle::invocation::triggers::{
api_gateway_http_event::APIGatewayHttpEvent,
api_gateway_rest_event::APIGatewayRestEvent,
dynamodb_event::DynamoDbRecord,
event_bridge_event::EventBridgeEvent,
sns_event::{SnsEntity, SnsRecord},
sqs_event::SqsRecord,
Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
Expand Down Expand Up @@ -117,6 +118,12 @@ impl SpanInferrer {
if let Some(t) = S3Record::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);

trigger = Some(Box::new(t));
}
} else if EventBridgeEvent::is_match(payload_value) {
if let Some(t) = EventBridgeEvent::new(payload_value.clone()) {
t.enrich_span(&mut inferred_span);

trigger = Some(Box::new(t));
}
} else {
Expand Down
242 changes: 242 additions & 0 deletions bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use chrono::{DateTime, Utc};
use datadog_trace_protobuf::pb::Span;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use tracing::debug;

use crate::lifecycle::invocation::{
processor::{MS_TO_NS, S_TO_NS},
triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
};

const DATADOG_START_TIME_KEY: &str = "x-datadog-start-time";
const DATADOG_RESOURCE_NAME_KEY: &str = "x-datadog-resource-name";

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct EventBridgeEvent {
pub id: String,
pub version: String,
pub account: String,
pub time: DateTime<Utc>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted this into DateTime<Utc>

pub region: String,
pub resources: Vec<String>,
pub source: String,
#[serde(rename = "detail-type")]
pub detail_type: String,
pub detail: Value,
#[serde(rename = "replay-name")]
pub replay_name: Option<String>,
}

impl Trigger for EventBridgeEvent {
fn new(payload: Value) -> Option<Self> {
match serde_json::from_value(payload) {
Ok(event) => Some(event),
Err(e) => {
debug!("Failed to deserialize EventBridge Event: {}", e);
None
}
}
}

fn is_match(payload: &Value) -> bool {
payload.get("detail-type").is_some()
&& payload
.get("source")
.and_then(Value::as_str)
.map_or(false, |s| s != "aws.events")
Comment on lines +45 to +48
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this missed check

}

#[allow(clippy::cast_possible_truncation)]
fn enrich_span(&self, span: &mut Span) {
// EventBridge events have a timestamp resolution in seconds
let start_time_seconds = self
.time
.timestamp_nanos_opt()
.unwrap_or((self.time.timestamp_millis() as f64 * S_TO_NS) as i64);

let carrier = self.get_carrier();
let resource_name = carrier
.get(DATADOG_RESOURCE_NAME_KEY)
.unwrap_or(&self.source)
.clone();
let start_time = carrier
.get(DATADOG_START_TIME_KEY)
.and_then(|s| s.parse::<f64>().ok())
.map_or(start_time_seconds, |s| (s * MS_TO_NS) as i64);
Comment on lines +60 to +67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added resource name and start time accurate resolution when available


// todo: service mapping and peer service
let service_name = "eventbridge";

span.name = String::from("aws.eventbridge");
span.service = service_name.to_string();
span.resource = resource_name;
span.r#type = String::from("web");
span.start = start_time;
span.meta.extend(HashMap::from([
("operation_name".to_string(), "aws.eventbridge".to_string()),
("detail_type".to_string(), self.detail_type.clone()),
]));
}

fn get_tags(&self) -> HashMap<String, String> {
HashMap::from([(
FUNCTION_TRIGGER_EVENT_SOURCE_TAG.to_string(),
"eventbridge".to_string(),
)])
}

fn get_arn(&self, _region: &str) -> String {
self.source.clone()
}

fn get_carrier(&self) -> HashMap<String, String> {
if let Ok(detail) = serde_json::from_value::<HashMap<String, Value>>(self.detail.clone()) {
if let Some(carrier) = detail.get(DATADOG_CARRIER_KEY) {
return serde_json::from_value(carrier.clone()).unwrap_or_default();
}
}
HashMap::new()
}

fn is_async(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::lifecycle::invocation::triggers::test_utils::read_json_file;

#[test]
fn test_new() {
let json = read_json_file("eventbridge_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let result =
EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent");

let expected = EventBridgeEvent {
id: "bd3c8258-8d30-007c-2562-64715b2d0ea8".to_string(),
version: "0".to_string(),
account: "601427279990".to_string(),
time: DateTime::parse_from_rfc3339("2024-11-09T08:22:15Z")
.expect("Failed to parse time")
.with_timezone(&Utc),
region: "eu-west-1".to_string(),
resources: vec![],
source: "my.event".to_string(),
detail_type: "UserSignUp".to_string(),
detail: serde_json::json!({
"hello": "there",
"_datadog": {
"x-datadog-trace-id": "5827606813695714842",
"x-datadog-parent-id": "4726693487091824375",
"x-datadog-sampled": "1",
"x-datadog-sampling-priority": "1",
"x-datadog-resource-name": "testBus",
"x-datadog-start-time": "1731183820135"
}
}),
replay_name: None,
};

assert_eq!(result, expected);
}

#[test]
fn test_is_match() {
let json = read_json_file("eventbridge_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize EventBridgeEvent");

assert!(EventBridgeEvent::is_match(&payload));
}

#[test]
fn test_is_not_match() {
let json = read_json_file("api_gateway_http_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize EventBridgeEvent");
assert!(!EventBridgeEvent::is_match(&payload));
}

#[test]
fn test_enrich_span() {
let json = read_json_file("eventbridge_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event =
EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent");

let mut span = Span::default();
event.enrich_span(&mut span);

let expected = serde_json::from_str(&read_json_file("eventbridge_span.json"))
.expect("Failed to deserialize into Span");
assert_eq!(span, expected);
}

#[test]
fn test_enrich_span_no_resource_name() {
let json = read_json_file("eventbridge_no_resource_name_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event =
EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent");

let mut span = Span::default();
event.enrich_span(&mut span);

assert_eq!(span.resource, "my.event");
}

#[test]
fn test_enrich_span_no_timestamp() {
let json = read_json_file("eventbridge_no_timestamp_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event =
EventBridgeEvent::new(payload).expect("Failed to deserialize into EventBridgeEvent");

let mut span = Span::default();
event.enrich_span(&mut span);

assert_eq!(span.resource, "testBus");
// Seconds resolution
assert_eq!(span.start, 1731140535000000000);
}

#[test]
fn test_get_arn() {
let json = read_json_file("eventbridge_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = EventBridgeEvent::new(payload).expect("Failed to deserialize EventBridgeEvent");
assert_eq!(event.get_arn("us-east-1"), "my.event");
}

#[test]
fn test_get_carrier() {
let json = read_json_file("eventbridge_event.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event =
EventBridgeEvent::new(payload).expect("Failed to deserialize EventBridge Event");
let carrier = event.get_carrier();

let expected = HashMap::from([
(
"x-datadog-trace-id".to_string(),
"5827606813695714842".to_string(),
),
(
"x-datadog-parent-id".to_string(),
"4726693487091824375".to_string(),
),
("x-datadog-sampling-priority".to_string(), "1".to_string()),
("x-datadog-sampled".to_string(), "1".to_string()),
("x-datadog-resource-name".to_string(), "testBus".to_string()),
(
"x-datadog-start-time".to_string(),
"1731183820135".to_string(),
),
]);

assert_eq!(carrier, expected);
}
}
7 changes: 6 additions & 1 deletion bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde_json::Value;
pub mod api_gateway_http_event;
pub mod api_gateway_rest_event;
pub mod dynamodb_event;
pub mod event_bridge_event;
pub mod s3_event;
pub mod sns_event;
pub mod sqs_event;
Expand Down Expand Up @@ -67,9 +68,13 @@ where
#[cfg(test)]
pub mod test_utils {
use std::fs;
use std::path::PathBuf;

#[must_use]
pub fn read_json_file(file_name: &str) -> String {
fs::read_to_string(format!("tests/payloads/{file_name}")).expect("Failed to read file")
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/payloads");
path.push(file_name);
fs::read_to_string(path).expect("Failed to read file")
}
}
13 changes: 10 additions & 3 deletions bottlecap/src/proc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,19 @@ fn get_uptime_from_path(path: &str) -> Result<f64, io::Error> {
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::path::PathBuf;

fn path_from_root(file: &str) -> String {
let mut safe_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
safe_path.push(file);
safe_path.to_str().unwrap().to_string()
}

#[test]
#[allow(clippy::float_cmp)]
fn test_get_network_data() {
let path = "./tests/proc/net/valid_dev";
let network_data_result = get_network_data_from_path(path);
let network_data_result = get_network_data_from_path(path_from_root(path).as_str());
assert!(network_data_result.is_ok());
let network_data_result = network_data_result.unwrap();
assert_eq!(network_data_result.rx_bytes, 180.0);
Expand All @@ -216,7 +223,7 @@ mod tests {
#[allow(clippy::float_cmp)]
fn test_get_cpu_data() {
let path = "./tests/proc/stat/valid_stat";
let cpu_data_result = get_cpu_data_from_path(path);
let cpu_data_result = get_cpu_data_from_path(path_from_root(path).as_str());
assert!(cpu_data_result.is_ok());
let cpu_data = cpu_data_result.unwrap();
assert_eq!(cpu_data.total_user_time_ms, 23370.0);
Expand Down Expand Up @@ -267,7 +274,7 @@ mod tests {
#[allow(clippy::float_cmp)]
fn test_get_uptime_data() {
let path = "./tests/proc/uptime/valid_uptime";
let uptime_data_result = get_uptime_from_path(path);
let uptime_data_result = get_uptime_from_path(path_from_root(path).as_str());
assert!(uptime_data_result.is_ok());
let uptime_data = uptime_data_result.unwrap();
assert_eq!(uptime_data, 3_213_103_123_000.0);
Expand Down
21 changes: 21 additions & 0 deletions bottlecap/tests/payloads/eventbridge_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"version": "0",
"id": "bd3c8258-8d30-007c-2562-64715b2d0ea8",
"detail-type": "UserSignUp",
"source": "my.event",
"account": "601427279990",
"time": "2024-11-09T08:22:15Z",
"region": "eu-west-1",
"resources": [],
"detail": {
"hello": "there",
"_datadog": {
"x-datadog-trace-id": "5827606813695714842",
"x-datadog-parent-id": "4726693487091824375",
"x-datadog-sampled": "1",
"x-datadog-sampling-priority": "1",
"x-datadog-resource-name": "testBus",
"x-datadog-start-time": "1731183820135"
}
}
}
19 changes: 19 additions & 0 deletions bottlecap/tests/payloads/eventbridge_no_resource_name_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "0",
"id": "bd3c8258-8d30-007c-2562-64715b2d0ea8",
"detail-type": "UserSignUp",
"source": "my.event",
"account": "601427279990",
"time": "2024-11-09T08:22:15Z",
"region": "eu-west-1",
"resources": [],
"detail": {
"hello": "there",
"_datadog": {
"x-datadog-trace-id": "5827606813695714842",
"x-datadog-parent-id": "4726693487091824375",
"x-datadog-sampling-priority": "1",
"x-datadog-start-time": "1731183820135"
}
}
}
19 changes: 19 additions & 0 deletions bottlecap/tests/payloads/eventbridge_no_timestamp_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "0",
"id": "bd3c8258-8d30-007c-2562-64715b2d0ea8",
"detail-type": "UserSignUp",
"source": "my.event",
"account": "601427279990",
"time": "2024-11-09T08:22:15Z",
"region": "eu-west-1",
"resources": [],
"detail": {
"hello": "there",
"_datadog": {
"x-datadog-trace-id": "5827606813695714842",
"x-datadog-parent-id": "4726693487091824375",
"x-datadog-sampling-priority": "1",
"x-datadog-resource-name": "testBus"
}
}
}
Loading