Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bottlecap/src/lifecycle/invocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
use base64::{engine::general_purpose, DecodeError, Engine};

pub mod context;
pub mod processor;
pub mod span_inferrer;
pub mod triggers;

pub fn base64_to_string(base64_string: &str) -> Result<String, DecodeError> {
match general_purpose::STANDARD.decode(base64_string) {
Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()),
Err(e) => Err(e),
}
}
93 changes: 79 additions & 14 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tracing::debug;

use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer},
lifecycle::invocation::{
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer,
},
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{self, CPUData, NetworkData},
tags::provider,
Expand All @@ -29,6 +31,11 @@ use crate::{
pub const MS_TO_NS: f64 = 1_000_000.0;
pub const S_TO_NS: f64 = 1_000_000_000.0;

pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg";
pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type";
pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error-stack";
pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error";

pub struct Processor {
pub context_buffer: ContextBuffer,
inferrer: SpanInferrer,
Expand Down Expand Up @@ -64,11 +71,11 @@ impl Processor {
service,
name: "aws.lambda".to_string(),
resource,
trace_id: 0, // set later
span_id: 0, // maybe set later?
parent_id: 0, // set later
start: 0, // set later
duration: 0, // set later
trace_id: 0,
span_id: 0,
parent_id: 0,
start: 0,
duration: 0,
error: 0,
meta: HashMap::new(),
metrics: HashMap::new(),
Expand Down Expand Up @@ -167,9 +174,6 @@ impl Processor {
// - language
// - function.request - capture lambda payload
// - function.response
// - error.msg
// - error.type
// - error.stack
// - metrics tags (for asm)

if let Some(offsets) = &context.enhanced_metric_data {
Expand Down Expand Up @@ -328,15 +332,28 @@ impl Processor {
headers: HashMap<String, String>,
status_code: Option<String>,
) {
self.update_span_context(headers);
if self.inferrer.inferred_span.is_some() {
if let Some(status_code) = status_code {
self.inferrer.set_status_code(status_code);
if let Some(status_code) = status_code {
self.span
.meta
.insert("http.status_code".to_string(), status_code.clone());

if status_code.len() == 3 && status_code.starts_with('5') {
self.span.error = 1;
}

// If we have an inferred span, set the status code to it
self.inferrer.set_status_code(status_code);
}

self.update_span_context_from_headers(&headers);
self.set_span_error_from_headers(headers);

if self.span.error == 1 {
self.enhanced_metrics.increment_errors_metric();
}
}

fn update_span_context(&mut self, headers: HashMap<String, String>) {
fn update_span_context_from_headers(&mut self, headers: &HashMap<String, String>) {
// todo: fix this, code is a copy of the existing logic in Go, not accounting
// when a 128 bit trace id exist
let mut trace_id = 0;
Expand Down Expand Up @@ -364,8 +381,56 @@ impl Processor {
self.span.trace_id = trace_id;
self.span.span_id = span_id;

// If no inferred span, set the parent id right away
if self.inferrer.inferred_span.is_none() {
self.span.parent_id = parent_id;
}
}

/// Given end invocation headers, set error metadata, if present, to the current span.
///
fn set_span_error_from_headers(&mut self, headers: HashMap<String, String>) {
let message = headers.get(DATADOG_INVOCATION_ERROR_MESSAGE_KEY);
let r#type = headers.get(DATADOG_INVOCATION_ERROR_TYPE_KEY);
let stack = headers.get(DATADOG_INVOCATION_ERROR_STACK_KEY);

let is_error = headers
.get(DATADOG_INVOCATION_ERROR_KEY)
.map_or(false, |v| v.to_lowercase() == "true")
|| message.is_some()
|| stack.is_some()
|| r#type.is_some()
|| self.span.error == 1;
if is_error {
self.span.error = 1;

if let Some(m) = message {
self.span
.meta
.insert(String::from("error.msg"), m.to_string());
}

if let Some(t) = r#type {
self.span
.meta
.insert(String::from("error.type"), t.to_string());
}

if let Some(s) = stack {
let decoded_stack = match base64_to_string(s) {
Ok(decoded) => decoded,
Err(e) => {
debug!("Failed to decode error stack: {e}");
s.to_string()
}
};

self.span
.meta
.insert(String::from("error.stack"), decoded_stack);
}

// todo: handle timeout
}
}
}
6 changes: 6 additions & 0 deletions bottlecap/src/lifecycle/invocation/span_inferrer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ impl SpanInferrer {
ws.duration = duration;
}

// Set error
ws.error = invocation_span.error;

ws.trace_id = invocation_span.trace_id;
}

Expand All @@ -201,6 +204,9 @@ impl SpanInferrer {
s.duration = duration;
}

// Set error
s.error = invocation_span.error;

s.trace_id = invocation_span.trace_id;
}
}
Expand Down
9 changes: 0 additions & 9 deletions bottlecap/src/lifecycle/invocation/triggers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{collections::HashMap, hash::BuildHasher};

use base64::{engine::general_purpose, Engine};
use datadog_trace_protobuf::pb::Span;
use serde::{ser::SerializeMap, Serializer};
use serde_json::Value;
Expand Down Expand Up @@ -40,14 +39,6 @@ pub fn get_aws_partition_by_region(region: &str) -> String {
}
}

#[must_use]
pub fn base64_to_string(base64_string: &str) -> String {
let bytes = general_purpose::STANDARD
.decode(base64_string)
.unwrap_or_default();
String::from_utf8_lossy(&bytes).to_string()
}

/// Serialize a `HashMap` with lowercase keys
///
pub fn lowercase_key<S, H>(
Expand Down
10 changes: 6 additions & 4 deletions bottlecap/src/lifecycle/invocation/triggers/sns_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use serde_json::Value;
use tracing::debug;

use crate::lifecycle::invocation::{
base64_to_string,
processor::MS_TO_NS,
triggers::{base64_to_string, Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
triggers::{Trigger, DATADOG_CARRIER_KEY, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -132,20 +133,21 @@ impl Trigger for SnsRecord {
}

fn get_carrier(&self) -> HashMap<String, String> {
let carrier = HashMap::new();
if let Some(ma) = self.sns.message_attributes.get(DATADOG_CARRIER_KEY) {
match ma.r#type.as_str() {
"String" => return serde_json::from_str(&ma.value).unwrap_or_default(),
"Binary" => {
return serde_json::from_str(&base64_to_string(&ma.value)).unwrap_or_default()
if let Ok(carrier) = base64_to_string(&ma.value) {
return serde_json::from_str(&carrier).unwrap_or_default();
}
}
_ => {
debug!("Unsupported type in SNS message attribute");
}
}
}

carrier
HashMap::new()
}

fn is_async(&self) -> bool {
Expand Down
8 changes: 4 additions & 4 deletions bottlecap/src/lifecycle/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ impl Listener {
let parsed_body = serde_json::from_slice::<serde_json::Value>(
&hyper::body::to_bytes(body).await.unwrap_or_default(),
);
let mut parsed_status: Option<String> = None;
if let Some(status_code) = parsed_body.unwrap_or_default().get("statusCode") {
parsed_status = Some(status_code.to_string());
let mut parsed_status_code: Option<String> = None;
if let Some(sc) = parsed_body.unwrap_or_default().get("statusCode") {
parsed_status_code = Some(sc.to_string());
}

let mut processor = invocation_processor.lock().await;

let headers = Self::headers_to_map(parts.headers);
processor.on_invocation_end(headers, parsed_status);
processor.on_invocation_end(headers, parsed_status_code);
drop(processor);

Response::builder()
Expand Down