Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use std::{
path::Path,
process::Command,
sync::{Arc, Mutex},
time::Instant,
};
use telemetry::listener::TelemetryListenerConfig;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -201,6 +202,7 @@ fn load_configs() -> (AwsConfig, Arc<Config>) {
aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(),
aws_session_token: env::var("AWS_SESSION_TOKEN").unwrap_or_default(),
function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").unwrap_or_default(),
sandbox_init_time: Instant::now(),
};
let lambda_directory = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string());
let config = match config::get_config(Path::new(&lambda_directory)) {
Expand Down Expand Up @@ -400,9 +402,9 @@ async fn extension_loop_active(
}
Event::Telemetry(event) =>
match event.record {
TelemetryRecord::PlatformStart { request_id, .. } => {
TelemetryRecord::PlatformInitStart { .. } => {
let mut p = invocation_processor.lock().await;
p.on_platform_start(request_id, event.time);
p.on_platform_init_start(event.time);
drop(p);
}
TelemetryRecord::PlatformInitReport {
Expand All @@ -415,6 +417,11 @@ async fn extension_loop_active(
p.on_platform_init_report(metrics.duration_ms);
drop(p);
}
TelemetryRecord::PlatformStart { request_id, .. } => {
let mut p = invocation_processor.lock().await;
p.on_platform_start(request_id, event.time);
drop(p);
}
TelemetryRecord::PlatformRuntimeDone {
request_id,
status,
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod processing_rule;
pub mod trace_propagation_style;

use std::path::Path;
use std::time::Instant;
use std::vec;

use figment::providers::{Format, Yaml};
Expand Down Expand Up @@ -221,6 +222,7 @@ pub struct AwsConfig {
pub aws_secret_access_key: String,
pub aws_session_token: String,
pub function_name: String,
pub sandbox_init_time: Instant,
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ impl ContextBuffer {
pub fn size(&self) -> usize {
self.buffer.len()
}

/// Returns if the buffer is empty.
///
#[must_use]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}

#[cfg(test)]
Expand Down
22 changes: 22 additions & 0 deletions bottlecap/src/lifecycle/invocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use base64::{engine::general_purpose, DecodeError, Engine};
use datadog_trace_protobuf::pb::Span;
use rand::{rngs::OsRng, Rng, RngCore};

use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};
use serde_json::Value;
use tracing::debug;

Expand Down Expand Up @@ -27,6 +30,25 @@ pub fn base64_to_string(base64_string: &str) -> Result<String, DecodeError> {
}
}

fn create_empty_span(name: String, resource: String, service: String) -> Span {
Span {
name,
resource,
service,
r#type: String::from("serverless"),
..Default::default()
}
}

fn generate_span_id() -> u64 {
if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) {
return OsRng.next_u64();
}

let mut rng = rand::thread_rng();
rng.gen()
}

pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) {
// Null scenario
if value.is_null() {
Expand Down
145 changes: 112 additions & 33 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
time::{Instant, SystemTime, UNIX_EPOCH},
};

use chrono::{DateTime, Utc};
Expand All @@ -15,7 +15,8 @@ use tracing::debug;
use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value,
base64_to_string, context::ContextBuffer, create_empty_span, generate_span_id,
span_inferrer::SpanInferrer, tag_span_from_value,
},
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{self, CPUData, NetworkData},
Expand All @@ -36,21 +37,31 @@ use crate::{

pub const MS_TO_NS: f64 = 1_000_000.0;
pub const S_TO_NS: f64 = 1_000_000_000.0;
pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000;

pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg";
pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type";
pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error-stack";
pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error";

pub struct Processor {
// Buffer containing context of the previous 5 invocations
pub context_buffer: ContextBuffer,
// Helper to infer span information
inferrer: SpanInferrer,
// Current invocation span
pub span: Span,
// Cold start span
cold_start_span: Option<Span>,
// Extracted span context from inferred span, headers, or payload
pub extracted_span_context: Option<SpanContext>,
// Used to extract the trace context from inferred span, headers, or payload
propagator: DatadogCompositePropagator,
// Helper to send enhanced metrics
enhanced_metrics: EnhancedMetrics,
// AWS configuration from the Lambda environment
aws_config: AwsConfig,
// Flag to determine if a tracer was detected
tracer_detected: bool,
config: Arc<config::Config>,
}
Expand All @@ -63,32 +74,18 @@ impl Processor {
aws_config: &AwsConfig,
metrics_aggregator: Arc<Mutex<MetricsAggregator>>,
) -> Self {
let service = config.service.clone().unwrap_or("aws.lambda".to_string());
let service = config.service.clone().unwrap_or(String::from("aws.lambda"));
let resource = tags_provider
.get_canonical_resource_name()
.unwrap_or("aws_lambda".to_string());
.unwrap_or(String::from("aws.lambda"));

let propagator = DatadogCompositePropagator::new(Arc::clone(&config));

Processor {
context_buffer: ContextBuffer::default(),
inferrer: SpanInferrer::default(),
span: Span {
service,
name: "aws.lambda".to_string(),
resource,
trace_id: 0,
span_id: 0,
parent_id: 0,
start: 0,
duration: 0,
error: 0,
meta: HashMap::new(),
metrics: HashMap::new(),
r#type: "serverless".to_string(),
meta_struct: HashMap::new(),
span_links: Vec::new(),
},
span: create_empty_span(String::from("aws.lambda"), resource, service),
cold_start_span: None,
extracted_span_context: None,
propagator,
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
Expand All @@ -101,6 +98,9 @@ impl Processor {
/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
self.reset_state();
self.set_init_tags();

self.context_buffer.create_context(request_id.clone());
if self.config.enhanced_metrics {
// Collect offsets for network and cpu metrics
Expand Down Expand Up @@ -132,10 +132,87 @@ impl Processor {
self.enhanced_metrics.increment_invocation_metric();
}

/// Resets the state of the processor to default values.
///
fn reset_state(&mut self) {
// Reset Span Context on Span
self.span.trace_id = 0;
self.span.parent_id = 0;
self.span.span_id = 0;
// Error
self.span.error = 0;
// Meta tags
self.span.meta.clear();
// Extracted Span Context
self.extracted_span_context = None;
// Cold Start Span
self.cold_start_span = None;
}

/// On the first invocation, determine if it's a cold start or proactive init.
///
/// For every other invocation, it will always be warm start.
///
fn set_init_tags(&mut self) {
let mut proactive_initialization = false;
let mut cold_start = false;

// If it's empty, then we are in a cold start
if self.context_buffer.is_empty() {
let now = Instant::now();
let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time);
if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() {
proactive_initialization = true;
} else {
cold_start = true;
}
}

if proactive_initialization {
self.span.meta.insert(
String::from("proactive_initialization"),
proactive_initialization.to_string(),
);
}
self.span
.meta
.insert(String::from("cold_start"), cold_start.to_string());

self.enhanced_metrics
.set_init_tags(proactive_initialization, cold_start);
}

pub fn on_platform_init_start(&mut self, time: DateTime<Utc>) {
// Create a cold start span
let mut cold_start_span = create_empty_span(
String::from("aws.lambda.cold_start"),
self.span.resource.clone(),
self.span.service.clone(),
);

let start_time: i64 = SystemTime::from(time)
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_nanos()
.try_into()
.unwrap_or_default();

cold_start_span.span_id = generate_span_id();
cold_start_span.start = start_time;

self.cold_start_span = Some(cold_start_span);
}

/// Given the duration of the platform init report, set the init duration metric.
///
#[allow(clippy::cast_possible_truncation)]
pub fn on_platform_init_report(&mut self, duration_ms: f64) {
self.enhanced_metrics.set_init_duration_metric(duration_ms);

if let Some(cold_start_span) = &mut self.cold_start_span {
// `round` is intentionally meant to be a whole integer
cold_start_span.duration = (duration_ms * MS_TO_NS) as i64;
}
}

/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
Expand Down Expand Up @@ -183,10 +260,10 @@ impl Processor {
}

if let Some(context) = self.context_buffer.get(request_id) {
let span = &mut self.span;
// `round` is intentionally meant to be a whole integer
span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
span.meta
self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
self.span
.meta
.insert("request_id".to_string(), request_id.clone());
// todo(duncanista): add missing tags
// - cold start, proactive init
Expand All @@ -213,6 +290,11 @@ impl Processor {

self.inferrer.complete_inferred_spans(&self.span);

if let Some(cold_start_span) = &mut self.cold_start_span {
cold_start_span.trace_id = self.span.trace_id;
cold_start_span.parent_id = self.span.parent_id;
}

if self.tracer_detected {
let mut body_size = std::mem::size_of_val(&self.span);
let mut traces = vec![self.span.clone()];
Expand All @@ -227,6 +309,11 @@ impl Processor {
traces.push(ws.clone());
}

if let Some(cold_start_span) = &self.cold_start_span {
body_size += std::mem::size_of_val(cold_start_span);
traces.push(cold_start_span.clone());
}

// todo: figure out what to do here
let header_tags = tracer_header_tags::TracerHeaderTags {
lang: "",
Expand Down Expand Up @@ -263,7 +350,7 @@ impl Processor {
// Set the report log metrics
self.enhanced_metrics.set_report_log_metrics(&metrics);

if let Some(context) = self.context_buffer.remove(request_id) {
if let Some(context) = self.context_buffer.get(request_id) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a get since I wanna make sure that only when the buffer is empty, we can calculate the init tags correctly

if context.runtime_duration_ms != 0.0 {
let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms;

Expand All @@ -273,7 +360,7 @@ impl Processor {
}

// Set Network and CPU time metrics
if let Some(offsets) = context.enhanced_metric_data {
if let Some(offsets) = context.enhanced_metric_data.clone() {
self.enhanced_metrics
.set_network_enhanced_metrics(offsets.network_offset);
self.enhanced_metrics
Expand All @@ -288,14 +375,6 @@ impl Processor {
pub fn on_invocation_start(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
self.tracer_detected = true;

// Reset trace context
self.span.trace_id = 0;
self.span.parent_id = 0;
self.span.span_id = 0;
self.span.error = 0;
self.span.meta.clear();
self.extracted_span_context = None;

let payload_value = match serde_json::from_slice::<Value>(&payload) {
Ok(value) => value,
Err(_) => json!({}),
Expand Down
Loading