From 8ac6a324befc7e60bc6060292b5455f76ccaff9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 02:06:43 -0500 Subject: [PATCH 1/5] add `tag_span_from_value` --- bottlecap/src/lifecycle/invocation/mod.rs | 92 +++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 454cfa3bc..ca627ae7d 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,13 +1,105 @@ use base64::{engine::general_purpose, DecodeError, Engine}; +use datadog_trace_protobuf::pb::Span; +use serde_json::Value; +use tracing::debug; pub mod context; pub mod processor; pub mod span_inferrer; pub mod triggers; +const MAX_TAG_CHARS: usize = 4096; +const REDACTABLE_KEYS: [&str; 8] = [ + "password", + "passwd", + "pwd", + "secret", + "token", + "authorization", + "x-authorization", + "api_key", +]; + pub fn base64_to_string(base64_string: &str) -> Result { match general_purpose::STANDARD.decode(base64_string) { Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()), Err(e) => Err(e), } } + +pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) { + // Null scenario + if value.is_null() { + span.meta.insert(key.to_string(), value.to_string()); + return; + } + + // Check max depth + if depth >= max_depth { + match serde_json::to_string(value) { + Ok(s) => { + let truncated = s.chars().take(MAX_TAG_CHARS).collect::(); + span.meta.insert(key.to_string(), truncated); + return; + } + Err(e) => { + debug!("Unable to serialize value for tagging {e}"); + return; + } + } + } + + let new_depth = depth + 1; + match value { + // Handle string case + Value::String(s) => { + if let Ok(p) = serde_json::from_str::(s) { + tag_span_from_value(span, key, &p, new_depth, max_depth); + } else { + let truncated = s.chars().take(MAX_TAG_CHARS).collect::(); + span.meta + .insert(key.to_string(), redact_value(key, truncated)); + } + } + + // Handle number case + Value::Number(n) => { + span.meta.insert(key.to_string(), n.to_string()); + } + + // Handle boolean case + Value::Bool(b) => { + span.meta.insert(key.to_string(), b.to_string()); + } + + // Handle object case + Value::Object(map) => { + for (k, v) in map { + let new_key = format!("{key}.{k}"); + tag_span_from_value(span, &new_key, v, new_depth, max_depth); + } + } + + Value::Array(a) => { + if a.is_empty() { + span.meta.insert(key.to_string(), "[]".to_string()); + return; + } + + for (i, v) in a.iter().enumerate() { + let new_key = format!("{key}.{i}"); + tag_span_from_value(span, &new_key, v, new_depth, max_depth); + } + } + Value::Null => {} + } +} + +fn redact_value(key: &str, value: String) -> String { + let split_key = key.split('.').last().unwrap_or_default(); + if REDACTABLE_KEYS.contains(&split_key) { + String::from("redacted") + } else { + value + } +} From 2bd0853347a95ea5ef02f0ab5d683dcd56c3aefe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:54:30 -0500 Subject: [PATCH 2/5] add `capture_lambda_payload` config --- bottlecap/src/config/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 870b42d21..2c0fbab2f 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -65,6 +65,8 @@ pub struct Config { pub serverless_flush_strategy: FlushStrategy, pub enhanced_metrics: bool, pub https_proxy: Option, + pub capture_lambda_payload: bool, + pub capture_lambda_payload_max_depth: u32, // Trace Propagation #[serde(deserialize_with = "deserialize_trace_propagation_style")] pub trace_propagation_style: Vec, @@ -93,8 +95,9 @@ impl Default for Config { logs_config_processing_rules: None, // Metrics enhanced_metrics: true, - // Failover https_proxy: None, + capture_lambda_payload: false, + capture_lambda_payload_max_depth: 10, // Trace Propagation trace_propagation_style: vec![ TracePropagationStyle::Datadog, From 8762c505be151b606b655ba649746677c5e75c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:54:42 -0500 Subject: [PATCH 3/5] add unit testing for `tag_span_from_value` --- bottlecap/src/lifecycle/invocation/mod.rs | 151 ++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index ca627ae7d..aca184c2b 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -103,3 +103,154 @@ fn redact_value(key: &str, value: String) -> String { value } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use serde_json::json; + + use super::*; + + #[test] + fn test_simple_tagging() { + let mut span = Span::default(); + let value = json!({ "request": { "simple": "value" } }); + + tag_span_from_value(&mut span, "payload", &value, 0, 10); + + let expected = HashMap::from([("payload.request.simple".to_string(), "value".to_string())]); + + assert_eq!(span.meta, expected); + } + + #[test] + fn test_complex_object() { + let mut span = Span::default(); + let value = json!({ + "request": { + "simple": "value", + "obj": { + "arr": ["a", "b", "c"], + "boolean": true, + "nested": { + "value": "nested_value" + } + }, + "empty": null, + "number": 1, + "boolean": true, + } + }); + + tag_span_from_value(&mut span, "payload", &value, 0, 10); + + let expected = HashMap::from([ + ("payload.request.simple".to_string(), "value".to_string()), + ("payload.request.obj.arr.0".to_string(), "a".to_string()), + ("payload.request.obj.arr.1".to_string(), "b".to_string()), + ("payload.request.obj.arr.2".to_string(), "c".to_string()), + ( + "payload.request.obj.boolean".to_string(), + "true".to_string(), + ), + ( + "payload.request.obj.nested.value".to_string(), + "nested_value".to_string(), + ), + ("payload.request.empty".to_string(), "null".to_string()), + ("payload.request.number".to_string(), "1".to_string()), + ("payload.request.boolean".to_string(), "true".to_string()), + ]); + + assert_eq!(span.meta, expected); + } + + #[test] + fn test_array_of_objects() { + let mut span = Span::default(); + let value = json!({ + "request": [ + { "simple": "value" }, + { "simple": "value" }, + { "simple": "value" }, + ] + }); + + tag_span_from_value(&mut span, "payload", &value, 0, 10); + + let expected = HashMap::from([ + ("payload.request.0.simple".to_string(), "value".to_string()), + ("payload.request.1.simple".to_string(), "value".to_string()), + ("payload.request.2.simple".to_string(), "value".to_string()), + ]); + + assert_eq!(span.meta, expected); + } + + #[test] + fn test_reach_max_depth() { + let mut span = Span::default(); + let value = json!({ + "hello": "world", + "empty": null, + "level1": { + "obj": { + "level3": 3 + }, + "arr": [null, true, "great", { "l3": "v3" }], + "boolean": true, + "number": 2, + "empty": null, + "empty_obj": {}, + "empty_arr": [] + }, + "arr": [{ "a": "b" }, { "c": "d" }] + }); + + tag_span_from_value(&mut span, "payload", &value, 0, 2); + + let expected = HashMap::from([ + ("payload.hello".to_string(), "world".to_string()), + ("payload.empty".to_string(), "null".to_string()), + ( + "payload.level1.obj".to_string(), + "{\"level3\":3}".to_string(), + ), + ( + "payload.level1.arr".to_string(), + "[null,true,\"great\",{\"l3\":\"v3\"}]".to_string(), + ), + ("payload.level1.boolean".to_string(), "true".to_string()), + ("payload.level1.number".to_string(), "2".to_string()), + ("payload.level1.empty".to_string(), "null".to_string()), + ("payload.level1.empty_obj".to_string(), "{}".to_string()), + ("payload.level1.empty_arr".to_string(), "[]".to_string()), + ("payload.arr.0".to_string(), "{\"a\":\"b\"}".to_string()), + ("payload.arr.1".to_string(), "{\"c\":\"d\"}".to_string()), + ]); + + assert_eq!(span.meta, expected); + } + + #[test] + fn test_tag_redacts_key() { + let mut span = Span::default(); + let value = json!({ + "request": { + "headers": { + "authorization": "secret token", + } + } + }); + + tag_span_from_value(&mut span, "payload", &value, 0, 10); + + let expected = HashMap::from([( + "payload.request.headers.authorization".to_string(), + "redacted".to_string(), + )]); + + assert_eq!(span.meta, expected); + } +} From fea60827ad87fadb3cf238abe24795b102c4be23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:55:43 -0500 Subject: [PATCH 4/5] update listener `end_invocation_handler` parsing should not be handled here --- bottlecap/src/lifecycle/listener.rs | 32 ++++++++++++++++------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index 589815a4a..388fba133 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -141,23 +141,27 @@ impl Listener { ) -> http::Result> { debug!("Received end invocation request"); let (parts, body) = req.into_parts(); - let parsed_body = serde_json::from_slice::( - &hyper::body::to_bytes(body).await.unwrap_or_default(), - ); - let mut parsed_status_code: Option = None; - if let Some(sc) = parsed_body.unwrap_or_default().get("statusCode") { - parsed_status_code = Some(sc.to_string()); - } + match hyper::body::to_bytes(body).await { + Ok(b) => { + let body = b.to_vec(); + let mut processor = invocation_processor.lock().await; - let mut processor = invocation_processor.lock().await; + let headers = Self::headers_to_map(parts.headers); + processor.on_invocation_end(headers, body); + drop(processor); - let headers = Self::headers_to_map(parts.headers); - processor.on_invocation_end(headers, parsed_status_code); - drop(processor); + Response::builder() + .status(200) + .body(Body::from(json!({}).to_string())) + } + Err(e) => { + error!("Could not read end invocation request body {e}"); - Response::builder() - .status(200) - .body(Body::from(json!({}).to_string())) + Response::builder() + .status(400) + .body(Body::from("Could not read end invocation request body")) + } + } } fn hello_handler() -> http::Result> { From 2d150c93c9a84b1f09191ed9fcb07e618f098811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:56:11 -0500 Subject: [PATCH 5/5] add capture lambda payload feature also parse body properly, and handle `statusCode` --- .../src/lifecycle/invocation/processor.rs | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 64a8582b1..9c72f4536 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -15,7 +15,7 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{ - base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value, }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, @@ -52,7 +52,7 @@ pub struct Processor { enhanced_metrics: EnhancedMetrics, aws_config: AwsConfig, tracer_detected: bool, - enhanced_metrics_enabled: bool, + config: Arc, } impl Processor { @@ -94,7 +94,7 @@ impl Processor { enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), aws_config: aws_config.clone(), tracer_detected: false, - enhanced_metrics_enabled: config.enhanced_metrics, + config: Arc::clone(&config), } } @@ -102,7 +102,7 @@ impl Processor { /// pub fn on_invoke_event(&mut self, request_id: String) { self.context_buffer.create_context(request_id.clone()); - if self.enhanced_metrics_enabled { + if self.config.enhanced_metrics { // Collect offsets for network and cpu metrics let network_offset: Option = proc::get_network_data().ok(); let cpu_offset: Option = proc::get_cpu_data().ok(); @@ -293,6 +293,17 @@ impl Processor { Err(_) => json!({}), }; + // Tag the invocation span with the request payload + if self.config.capture_lambda_payload { + tag_span_from_value( + &mut self.span, + "function.request", + &payload_value, + 0, + self.config.capture_lambda_payload_max_depth, + ); + } + self.inferrer.infer_span(&payload_value, &self.aws_config); self.extracted_span_context = self.extract_span_context(&headers, &payload_value); @@ -344,22 +355,34 @@ impl Processor { /// Given trace context information, set it to the current span. /// - pub fn on_invocation_end( - &mut self, - headers: HashMap, - status_code: Option, - ) { - if let Some(status_code) = status_code { + pub fn on_invocation_end(&mut self, headers: HashMap, payload: Vec) { + let payload_value = match serde_json::from_slice::(&payload) { + Ok(value) => value, + Err(_) => json!({}), + }; + + // Tag the invocation span with the request payload + if self.config.capture_lambda_payload { + tag_span_from_value( + &mut self.span, + "function.response", + &payload_value, + 0, + self.config.capture_lambda_payload_max_depth, + ); + } + + if let Some(status_code) = payload_value.get("statusCode").and_then(Value::as_str) { self.span .meta - .insert("http.status_code".to_string(), status_code.clone()); + .insert("http.status_code".to_string(), status_code.to_string()); if status_code.len() == 3 && status_code.starts_with('5') { self.span.error = 1; } // If we have an inferred span, set the status code to it - self.inferrer.set_status_code(status_code); + self.inferrer.set_status_code(status_code.to_string()); } self.update_span_context_from_headers(&headers);