From 81b0bcd0ed2e263626300586b654f08f7c8d984e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 11 Nov 2024 13:21:47 -0500 Subject: [PATCH 1/5] move `base64_to_string` to `lifecycle::invocation` module --- bottlecap/src/lifecycle/invocation/mod.rs | 9 +++++++++ bottlecap/src/lifecycle/invocation/triggers/mod.rs | 9 --------- .../src/lifecycle/invocation/triggers/sns_event.rs | 10 ++++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 39d0557dc..454cfa3bc 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,4 +1,13 @@ +use base64::{engine::general_purpose, DecodeError, Engine}; + pub mod context; pub mod processor; pub mod span_inferrer; pub mod triggers; + +pub fn base64_to_string(base64_string: &str) -> Result { + match general_purpose::STANDARD.decode(base64_string) { + Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()), + Err(e) => Err(e), + } +} diff --git a/bottlecap/src/lifecycle/invocation/triggers/mod.rs b/bottlecap/src/lifecycle/invocation/triggers/mod.rs index 8cfb2e578..c5d6987b2 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/mod.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/mod.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, hash::BuildHasher}; -use base64::{engine::general_purpose, Engine}; use datadog_trace_protobuf::pb::Span; use serde::{ser::SerializeMap, Serializer}; use serde_json::Value; @@ -39,14 +38,6 @@ pub fn get_aws_partition_by_region(region: &str) -> String { } } -#[must_use] -pub fn base64_to_string(base64_string: &str) -> String { - let bytes = general_purpose::STANDARD - .decode(base64_string) - .unwrap_or_default(); - String::from_utf8_lossy(&bytes).to_string() -} - /// Serialize a `HashMap` with lowercase keys /// pub fn lowercase_key( diff --git a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs index cbf313ff6..eaa1ab907 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/sns_event.rs @@ -6,8 +6,9 @@ use serde_json::Value; use tracing::debug; use crate::lifecycle::invocation::{ + base64_to_string, processor::MS_TO_NS, - triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, + triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG}, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -132,12 +133,13 @@ impl Trigger for SnsRecord { } fn get_carrier(&self) -> HashMap { - let carrier = HashMap::new(); if let Some(ma) = self.sns.message_attributes.get(DATADOG_CARRIER_KEY) { match ma.r#type.as_str() { "String" => return serde_json::from_str(&ma.value).unwrap_or_default(), "Binary" => { - return serde_json::from_str(&base64_to_string(&ma.value)).unwrap_or_default() + if let Ok(carrier) = base64_to_string(&ma.value) { + return serde_json::from_str(&carrier).unwrap_or_default(); + } } _ => { debug!("Unsupported type in SNS message attribute"); @@ -145,7 +147,7 @@ impl Trigger for SnsRecord { } } - carrier + HashMap::new() } fn is_async(&self) -> bool { From 58effc3e2c33ed0658de9a7e14d271580864fe9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 11 Nov 2024 13:23:04 -0500 Subject: [PATCH 2/5] set error on span from headers checks the headers to identify errors that should be attatched to the invocation span and the inferred span --- .../src/lifecycle/invocation/processor.rs | 89 ++++++++++++++++--- .../src/lifecycle/invocation/span_inferrer.rs | 8 +- bottlecap/src/lifecycle/listener.rs | 8 +- 3 files changed, 88 insertions(+), 17 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 471c00f69..a0b32d99b 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -13,7 +13,9 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, - lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer}, + lifecycle::invocation::{ + base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + }, metrics::enhanced::lambda::EnhancedMetricData, proc::{self, CPUData, NetworkData}, tags::provider, @@ -27,6 +29,11 @@ use crate::{ pub const MS_TO_NS: f64 = 1_000_000.0; pub const S_TO_NS: f64 = 1_000_000_000.0; +pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg"; +pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type"; +pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error-stack"; +pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error"; + pub struct Processor { pub context_buffer: ContextBuffer, inferrer: SpanInferrer, @@ -60,11 +67,11 @@ impl Processor { service, name: "aws.lambda".to_string(), resource, - trace_id: 0, // set later - span_id: 0, // maybe set later? - parent_id: 0, // set later - start: 0, // set later - duration: 0, // set later + trace_id: 0, + span_id: 0, + parent_id: 0, + start: 0, + duration: 0, error: 0, meta: HashMap::new(), metrics: HashMap::new(), @@ -150,7 +157,8 @@ impl Processor { self.span.meta.extend(trigger_tags); } - self.inferrer.complete_inferred_spans(&self.span); + self.inferrer + .complete_inferred_spans(&self.span, self.span.error == 1); if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); @@ -291,15 +299,26 @@ impl Processor { headers: HashMap, status_code: Option, ) { - self.update_span_context(headers); - if self.inferrer.inferred_span.is_some() { - if let Some(status_code) = status_code { - self.inferrer.set_status_code(status_code); + if let Some(status_code) = status_code { + self.span + .meta + .insert("http.status_code".to_string(), status_code.clone()); + + if status_code.len() == 3 && status_code.starts_with('5') { + self.span.error = 1; } + + // If we have an inferred span, set the status code to it + self.inferrer.set_status_code(status_code); } + + self.update_span_context_from_headers(&headers); + self.set_span_error_from_headers(headers); + + // todo: increment error metrics if there's an error } - fn update_span_context(&mut self, headers: HashMap) { + fn update_span_context_from_headers(&mut self, headers: &HashMap) { // todo: fix this, code is a copy of the existing logic in Go, not accounting // when a 128 bit trace id exist let mut trace_id = 0; @@ -327,8 +346,54 @@ impl Processor { self.span.trace_id = trace_id; self.span.span_id = span_id; + // If no inferred span, set the parent id right away if self.inferrer.inferred_span.is_none() { self.span.parent_id = parent_id; } } + + /// Given end invocation headers, set error metadata, if present, to the current span. + /// + fn set_span_error_from_headers(&mut self, headers: HashMap) { + let message = headers.get(DATADOG_INVOCATION_ERROR_MESSAGE_KEY); + let r#type = headers.get(DATADOG_INVOCATION_ERROR_TYPE_KEY); + let stack = headers.get(DATADOG_INVOCATION_ERROR_STACK_KEY); + + let is_error = headers + .get(DATADOG_INVOCATION_ERROR_KEY) + .map_or(false, |v| v.to_lowercase() == "true") + || message.is_some() + || stack.is_some() + || r#type.is_some() + || self.span.error == 1; + if is_error { + self.span.error = 1; + + if let Some(m) = message { + self.span + .meta + .insert(String::from("error.msg"), m.to_string()); + } + + if let Some(t) = r#type { + self.span + .meta + .insert(String::from("error.type"), t.to_string()); + } + + if let Some(s) = stack { + let decoded_stack = match base64_to_string(s) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode error stack: {e}"); + s.to_string() + } + }; + + self.span + .meta + .insert(String::from("error.stack"), decoded_stack); + } + } + } } diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 090292599..d7d040030 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -161,7 +161,7 @@ impl SpanInferrer { // TODO: add status tag and other info from response // TODO: add peer.service - pub fn complete_inferred_spans(&mut self, invocation_span: &Span) { + pub fn complete_inferred_spans(&mut self, invocation_span: &Span, has_errored: bool) { if let Some(s) = &mut self.inferred_span { if let Some(ws) = &mut self.wrapped_inferred_span { // Set correct Parent ID for multiple inferred spans @@ -180,6 +180,9 @@ impl SpanInferrer { ws.duration = duration; } + // Set error + ws.error = i32::from(has_errored); + ws.trace_id = invocation_span.trace_id; } @@ -194,6 +197,9 @@ impl SpanInferrer { s.duration = duration; } + // Set error + s.error = i32::from(has_errored); + s.trace_id = invocation_span.trace_id; } } diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index b255ec491..589815a4a 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -144,15 +144,15 @@ impl Listener { let parsed_body = serde_json::from_slice::( &hyper::body::to_bytes(body).await.unwrap_or_default(), ); - let mut parsed_status: Option = None; - if let Some(status_code) = parsed_body.unwrap_or_default().get("statusCode") { - parsed_status = Some(status_code.to_string()); + let mut parsed_status_code: Option = None; + if let Some(sc) = parsed_body.unwrap_or_default().get("statusCode") { + parsed_status_code = Some(sc.to_string()); } let mut processor = invocation_processor.lock().await; let headers = Self::headers_to_map(parts.headers); - processor.on_invocation_end(headers, parsed_status); + processor.on_invocation_end(headers, parsed_status_code); drop(processor); Response::builder() From 24145eb0ea8461a55b295ebd1bd69fa7955e7986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:38:31 -0500 Subject: [PATCH 3/5] increment metrics on error --- bottlecap/src/lifecycle/invocation/processor.rs | 12 +++++++++--- bottlecap/src/lifecycle/invocation/span_inferrer.rs | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 35b02004f..669ad5c82 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -14,7 +14,9 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, - lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer, base64_to_string}, + lifecycle::invocation::{ + base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, + }, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, tags::provider, @@ -190,7 +192,7 @@ impl Processor { } self.inferrer - .complete_inferred_spans(&self.span, self.span.error == 1); + .complete_inferred_spans(&self.span); if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); @@ -350,7 +352,9 @@ impl Processor { self.update_span_context_from_headers(&headers); self.set_span_error_from_headers(headers); - // todo: increment error metrics if there's an error + if self.span.error == 1 { + self.enhanced_metrics.increment_errors_metric(); + } } fn update_span_context_from_headers(&mut self, headers: &HashMap) { @@ -429,6 +433,8 @@ impl Processor { .meta .insert(String::from("error.stack"), decoded_stack); } + + // todo: handle timeout } } } diff --git a/bottlecap/src/lifecycle/invocation/span_inferrer.rs b/bottlecap/src/lifecycle/invocation/span_inferrer.rs index 04bf8c77e..cdd14d32d 100644 --- a/bottlecap/src/lifecycle/invocation/span_inferrer.rs +++ b/bottlecap/src/lifecycle/invocation/span_inferrer.rs @@ -168,7 +168,7 @@ impl SpanInferrer { // TODO: add status tag and other info from response // TODO: add peer.service - pub fn complete_inferred_spans(&mut self, invocation_span: &Span, has_errored: bool) { + pub fn complete_inferred_spans(&mut self, invocation_span: &Span) { if let Some(s) = &mut self.inferred_span { if let Some(ws) = &mut self.wrapped_inferred_span { // Set correct Parent ID for multiple inferred spans @@ -188,7 +188,7 @@ impl SpanInferrer { } // Set error - ws.error = i32::from(has_errored); + ws.error = invocation_span.error; ws.trace_id = invocation_span.trace_id; } @@ -205,7 +205,7 @@ impl SpanInferrer { } // Set error - s.error = i32::from(has_errored); + s.error = invocation_span.error; s.trace_id = invocation_span.trace_id; } From 1c8859c8afdd0da626348e4da10f04eeeaedfc7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:41:19 -0500 Subject: [PATCH 4/5] fmt --- bottlecap/src/lifecycle/invocation/processor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 669ad5c82..6892bc913 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -191,8 +191,7 @@ impl Processor { self.span.meta.extend(trigger_tags); } - self.inferrer - .complete_inferred_spans(&self.span); + self.inferrer.complete_inferred_spans(&self.span); if self.tracer_detected { let mut body_size = std::mem::size_of_val(&self.span); From 93c5e2c142e4e2763f760dfd66639b1e68290651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:56:02 -0500 Subject: [PATCH 5/5] remove a todo --- bottlecap/src/lifecycle/invocation/processor.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 6892bc913..b9e82db44 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -174,9 +174,6 @@ impl Processor { // - language // - function.request - capture lambda payload // - function.response - // - error.msg - // - error.type - // - error.stack // - metrics tags (for asm) if let Some(offsets) = &context.enhanced_metric_data {