Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct Config {
pub serverless_flush_strategy: FlushStrategy,
pub enhanced_metrics: bool,
pub https_proxy: Option<String>,
pub capture_lambda_payload: bool,
pub capture_lambda_payload_max_depth: u32,
// Trace Propagation
#[serde(deserialize_with = "deserialize_trace_propagation_style")]
pub trace_propagation_style: Vec<TracePropagationStyle>,
Expand Down Expand Up @@ -93,8 +95,9 @@ impl Default for Config {
logs_config_processing_rules: None,
// Metrics
enhanced_metrics: true,
// Failover
https_proxy: None,
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
// Trace Propagation
trace_propagation_style: vec![
TracePropagationStyle::Datadog,
Expand Down
243 changes: 243 additions & 0 deletions bottlecap/src/lifecycle/invocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,256 @@
use base64::{engine::general_purpose, DecodeError, Engine};
use datadog_trace_protobuf::pb::Span;
use serde_json::Value;
use tracing::debug;

pub mod context;
pub mod processor;
pub mod span_inferrer;
pub mod triggers;

const MAX_TAG_CHARS: usize = 4096;
const REDACTABLE_KEYS: [&str; 8] = [
"password",
"passwd",
"pwd",
"secret",
"token",
"authorization",
"x-authorization",
"api_key",
];

pub fn base64_to_string(base64_string: &str) -> Result<String, DecodeError> {
match general_purpose::STANDARD.decode(base64_string) {
Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()),
Err(e) => Err(e),
}
}

pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) {
// Null scenario
if value.is_null() {
span.meta.insert(key.to_string(), value.to_string());
return;
}

// Check max depth
if depth >= max_depth {
match serde_json::to_string(value) {
Ok(s) => {
let truncated = s.chars().take(MAX_TAG_CHARS).collect::<String>();
span.meta.insert(key.to_string(), truncated);
return;
}
Err(e) => {
debug!("Unable to serialize value for tagging {e}");
return;
}
}
}

let new_depth = depth + 1;
match value {
// Handle string case
Value::String(s) => {
if let Ok(p) = serde_json::from_str::<Value>(s) {
tag_span_from_value(span, key, &p, new_depth, max_depth);
} else {
let truncated = s.chars().take(MAX_TAG_CHARS).collect::<String>();
span.meta
.insert(key.to_string(), redact_value(key, truncated));
}
}

// Handle number case
Value::Number(n) => {
span.meta.insert(key.to_string(), n.to_string());
}

// Handle boolean case
Value::Bool(b) => {
span.meta.insert(key.to_string(), b.to_string());
}

// Handle object case
Value::Object(map) => {
for (k, v) in map {
let new_key = format!("{key}.{k}");
tag_span_from_value(span, &new_key, v, new_depth, max_depth);
}
}

Value::Array(a) => {
if a.is_empty() {
span.meta.insert(key.to_string(), "[]".to_string());
return;
}

for (i, v) in a.iter().enumerate() {
let new_key = format!("{key}.{i}");
tag_span_from_value(span, &new_key, v, new_depth, max_depth);
}
}
Value::Null => {}
}
}

fn redact_value(key: &str, value: String) -> String {
let split_key = key.split('.').last().unwrap_or_default();
if REDACTABLE_KEYS.contains(&split_key) {
String::from("redacted")
} else {
value
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use serde_json::json;

use super::*;

#[test]
fn test_simple_tagging() {
let mut span = Span::default();
let value = json!({ "request": { "simple": "value" } });

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([("payload.request.simple".to_string(), "value".to_string())]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_complex_object() {
let mut span = Span::default();
let value = json!({
"request": {
"simple": "value",
"obj": {
"arr": ["a", "b", "c"],
"boolean": true,
"nested": {
"value": "nested_value"
}
},
"empty": null,
"number": 1,
"boolean": true,
}
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([
("payload.request.simple".to_string(), "value".to_string()),
("payload.request.obj.arr.0".to_string(), "a".to_string()),
("payload.request.obj.arr.1".to_string(), "b".to_string()),
("payload.request.obj.arr.2".to_string(), "c".to_string()),
(
"payload.request.obj.boolean".to_string(),
"true".to_string(),
),
(
"payload.request.obj.nested.value".to_string(),
"nested_value".to_string(),
),
("payload.request.empty".to_string(), "null".to_string()),
("payload.request.number".to_string(), "1".to_string()),
("payload.request.boolean".to_string(), "true".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_array_of_objects() {
let mut span = Span::default();
let value = json!({
"request": [
{ "simple": "value" },
{ "simple": "value" },
{ "simple": "value" },
]
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([
("payload.request.0.simple".to_string(), "value".to_string()),
("payload.request.1.simple".to_string(), "value".to_string()),
("payload.request.2.simple".to_string(), "value".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_reach_max_depth() {
let mut span = Span::default();
let value = json!({
"hello": "world",
"empty": null,
"level1": {
"obj": {
"level3": 3
},
"arr": [null, true, "great", { "l3": "v3" }],
"boolean": true,
"number": 2,
"empty": null,
"empty_obj": {},
"empty_arr": []
},
"arr": [{ "a": "b" }, { "c": "d" }]
});

tag_span_from_value(&mut span, "payload", &value, 0, 2);

let expected = HashMap::from([
("payload.hello".to_string(), "world".to_string()),
("payload.empty".to_string(), "null".to_string()),
(
"payload.level1.obj".to_string(),
"{\"level3\":3}".to_string(),
),
(
"payload.level1.arr".to_string(),
"[null,true,\"great\",{\"l3\":\"v3\"}]".to_string(),
),
("payload.level1.boolean".to_string(), "true".to_string()),
("payload.level1.number".to_string(), "2".to_string()),
("payload.level1.empty".to_string(), "null".to_string()),
("payload.level1.empty_obj".to_string(), "{}".to_string()),
("payload.level1.empty_arr".to_string(), "[]".to_string()),
("payload.arr.0".to_string(), "{\"a\":\"b\"}".to_string()),
("payload.arr.1".to_string(), "{\"c\":\"d\"}".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_tag_redacts_key() {
let mut span = Span::default();
let value = json!({
"request": {
"headers": {
"authorization": "secret token",
}
}
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([(
"payload.request.headers.authorization".to_string(),
"redacted".to_string(),
)]);

assert_eq!(span.meta, expected);
}
}
47 changes: 35 additions & 12 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::debug;
use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer,
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value,
},
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{self, CPUData, NetworkData},
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct Processor {
enhanced_metrics: EnhancedMetrics,
aws_config: AwsConfig,
tracer_detected: bool,
enhanced_metrics_enabled: bool,
config: Arc<config::Config>,
}

impl Processor {
Expand Down Expand Up @@ -94,15 +94,15 @@ impl Processor {
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
aws_config: aws_config.clone(),
tracer_detected: false,
enhanced_metrics_enabled: config.enhanced_metrics,
config: Arc::clone(&config),
}
}

/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
self.context_buffer.create_context(request_id.clone());
if self.enhanced_metrics_enabled {
if self.config.enhanced_metrics {
// Collect offsets for network and cpu metrics
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
Expand Down Expand Up @@ -293,6 +293,17 @@ impl Processor {
Err(_) => json!({}),
};

// Tag the invocation span with the request payload
if self.config.capture_lambda_payload {
tag_span_from_value(
&mut self.span,
"function.request",
&payload_value,
0,
self.config.capture_lambda_payload_max_depth,
);
}

self.inferrer.infer_span(&payload_value, &self.aws_config);
self.extracted_span_context = self.extract_span_context(&headers, &payload_value);

Expand Down Expand Up @@ -344,22 +355,34 @@ impl Processor {

/// Given trace context information, set it to the current span.
///
pub fn on_invocation_end(
&mut self,
headers: HashMap<String, String>,
status_code: Option<String>,
) {
if let Some(status_code) = status_code {
pub fn on_invocation_end(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
let payload_value = match serde_json::from_slice::<Value>(&payload) {
Ok(value) => value,
Err(_) => json!({}),
};

// Tag the invocation span with the request payload
if self.config.capture_lambda_payload {
tag_span_from_value(
&mut self.span,
"function.response",
&payload_value,
0,
self.config.capture_lambda_payload_max_depth,
);
}

if let Some(status_code) = payload_value.get("statusCode").and_then(Value::as_str) {
self.span
.meta
.insert("http.status_code".to_string(), status_code.clone());
.insert("http.status_code".to_string(), status_code.to_string());

if status_code.len() == 3 && status_code.starts_with('5') {
self.span.error = 1;
}

// If we have an inferred span, set the status code to it
self.inferrer.set_status_code(status_code);
self.inferrer.set_status_code(status_code.to_string());
}

self.update_span_context_from_headers(&headers);
Expand Down
Loading