diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 39d0557dc..454cfa3bc 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,4 +1,13 @@ +use base64::{engine::general_purpose, DecodeError, Engine}; + pub mod context; pub mod processor; pub mod span_inferrer; pub mod triggers; + +pub fn base64_to_string(base64_string: &str) -> Result { + match general_purpose::STANDARD.decode(base64_string) { + Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()), + Err(e) => Err(e), + } +} diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 467519e37..b9e82db44 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -14,7 +14,9 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, - lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer}, + lifecycle::invocation::{ + base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, tags::provider, @@ -29,6 +31,11 @@ use crate::{ pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; +pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type"; +pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error-stack"; +pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error"; + pub struct Processor { pub context_buffer: ContextBuffer, inferrer: SpanInferrer, @@ -64,11 +71,11 @@ impl Processor { service, name: "aws.lambda".to_string(), resource, - trace_id: 0, // set later - span_id: 0, // maybe set later? - parent_id: 0, // set later - start: 0, // set later - duration: 0, // set later + trace_id: 0, + span_id: 0, + parent_id: 0, + start: 0, + duration: 0, error: 0, meta: HashMap::new(), metrics: HashMap::new(), @@ -167,9 +174,6 @@ impl Processor { // - language // - function.request - capture lambda payload // - function.response - // - error.msg - // - error.type - // - error.stack // - metrics tags (for asm) if let Some(offsets) = &context.enhanced_metric_data { @@ -328,15 +332,28 @@ impl Processor { headers: HashMap, status_code: Option, ) { - self.update_span_context(headers); - if self.inferrer.inferred_span.is_some() { - if let Some(status_code) = status_code { - self.inferrer.set_status_code(status_code); + if let Some(status_code) = status_code { + self.span + .meta + .insert("http.status_code".to_string(), status_code.clone()); + + if status_code.len() == 3 && status_code.starts_with('5') { + self.span.error = 1; } + + // If we have an inferred span, set the status code to it + self.inferrer.set_status_code(status_code); + } + + self.update_span_context_from_headers(&headers); + self.set_span_error_from_headers(headers); + + if self.span.error == 1 { + self.enhanced_metrics.increment_errors_metric(); } } - fn update_span_context(&mut self, headers: HashMap) { + fn update_span_context_from_headers(&mut self, headers: &HashMap) { // todo: fix this, code is a copy of the existing logic in Go, not accounting // when a 128 bit trace id exist let mut trace_id = 0; @@ -364,8 +381,56 @@ impl Processor { self.span.trace_id = trace_id; self.span.span_id = span_id; + // If no inferred span, set the parent id right away if self.inferrer.inferred_span.is_none() { self.span.parent_id = parent_id; } } + + /// Given end invocation headers, set error metadata, if present, to the current span. + /// + fn set_span_error_from_headers(&mut self, headers: HashMap) { + let message = headers.get(DATADOG_INVOCATION_ERROR_MESSAGE_KEY); + let r#type = headers.get(DATADOG_INVOCATION_ERROR_TYPE_KEY); + let stack = headers.get(DATADOG_INVOCATION_ERROR_STACK_KEY); + + let is_error = headers + .get(DATADOG_INVOCATION_ERROR_KEY) + .map_or(false, |v| v.to_lowercase() == "true") + || message.is_some() + || stack.is_some() + || r#type.is_some() + || self.span.error == 1; + if is_error { + self.span.error = 1; + + if let Some(m) = message { + self.span + .meta + .insert(String::from("error.msg"), m.to_string()); + } + + if let Some(t) = r#type { + self.span + .meta + .insert(String::from("error.type"), t.to_string()); + } + + if let Some(s) = stack { + let decoded_stack = match base64_to_string(s) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode error stack: {e}"); + s.to_string() + } + }; + + self.span + .meta + .insert(String::from("error.stack"), decoded_stack); + } + + // todo: handle timeout + } + } } diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index edc3253c1..cdd14d32d 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -187,6 +187,9 @@ impl SpanInferrer { ws.duration = duration; } + // Set error + ws.error = invocation_span.error; + ws.trace_id = invocation_span.trace_id; } @@ -201,6 +204,9 @@ impl SpanInferrer { s.duration = duration; } + // Set error + s.error = invocation_span.error; + s.trace_id = invocation_span.trace_id; } } diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index e0d347f08..dcc3f9b62 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, hash::BuildHasher}; -use base64::{engine::general_purpose, Engine}; use datadog_trace_protobuf::pb::Span; use serde::{ser::SerializeMap, Serializer}; use serde_json::Value; @@ -40,14 +39,6 @@ pub fn get_aws_partition_by_region(region: &str) -> String { } } -#[must_use] -pub fn base64_to_string(base64_string: &str) -> String { - let bytes = general_purpose::STANDARD - .decode(base64_string) - .unwrap_or_default(); - String::from_utf8_lossy(&bytes).to_string() -} - /// Serialize a `HashMap` with lowercase keys /// pub fn lowercase_key( diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs index cbf313ff6..eaa1ab907 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -6,8 +6,9 @@ use serde_json::Value; use tracing::debug; use crate::lifecycle::invocation::{ + base64_to_string, processor::MS_TO_NS, - triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -132,12 +133,13 @@ impl Trigger for SnsRecord { } fn get_carrier(&self) -> HashMap { - let carrier = HashMap::new(); if let Some(ma) = self.sns.message_attributes.get(DATADOG_CARRIER_KEY) { match ma.r#type.as_str() { "String" => return serde_json::from_str(&ma.value).unwrap_or_default(), "Binary" => { - return serde_json::from_str(&base64_to_string(&ma.value)).unwrap_or_default() + if let Ok(carrier) = base64_to_string(&ma.value) { + return serde_json::from_str(&carrier).unwrap_or_default(); + } } _ => { debug!("Unsupported type in SNS message attribute"); @@ -145,7 +147,7 @@ impl Trigger for SnsRecord { } } - carrier + HashMap::new() } fn is_async(&self) -> bool { diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index b255ec491..589815a4a 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -144,15 +144,15 @@ impl Listener { let parsed_body = serde_json::from_slice::( &hyper::body::to_bytes(body).await.unwrap_or_default(), ); - let mut parsed_status: Option = None; - if let Some(status_code) = parsed_body.unwrap_or_default().get("statusCode") { - parsed_status = Some(status_code.to_string()); + let mut parsed_status_code: Option = None; + if let Some(sc) = parsed_body.unwrap_or_default().get("statusCode") { + parsed_status_code = Some(sc.to_string()); } let mut processor = invocation_processor.lock().await; let headers = Self::headers_to_map(parts.headers); - processor.on_invocation_end(headers, parsed_status); + processor.on_invocation_end(headers, parsed_status_code); drop(processor); Response::builder()