Skip to content

Commit 8146fcc

Browse files
committed
feat(bottlecap): add Cold Start Span + Tags (#450)
* add some helper functions to `invocation::lifecycle` mod * create cold start span on processor * move `generate_span_id` to father module * send `platform_init_start` data to processor * send `PlatformInitStart` to main bus * update cold start `parent_id` * fix start time of cold start span * enhanced metrics now have a `dynamic_value_tags` for tags which we have to calculate at points in time * `AwsConfig` now has a `sandbox_init_time` value * add `is_empty` to `ContextBuffer` * calculate init tags on invoke also add a method to reset processor invocation state * restart init tags on set * set tags properly for proactive init * fix unit test * remove debug line * make sure `cold_start` tag is only set in one place
1 parent 5df55d1 commit 8146fcc

9 files changed

Lines changed: 271 additions & 95 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use std::{
6060
path::Path,
6161
process::Command,
6262
sync::{Arc, Mutex},
63+
time::Instant,
6364
};
6465
use telemetry::listener::TelemetryListenerConfig;
6566
use tokio::sync::mpsc::Sender;
@@ -201,6 +202,7 @@ fn load_configs() -> (AwsConfig, Arc<Config>) {
201202
aws_secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(),
202203
aws_session_token: env::var("AWS_SESSION_TOKEN").unwrap_or_default(),
203204
function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").unwrap_or_default(),
205+
sandbox_init_time: Instant::now(),
204206
};
205207
let lambda_directory = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string());
206208
let config = match config::get_config(Path::new(&lambda_directory)) {
@@ -401,9 +403,9 @@ async fn extension_loop_active(
401403
}
402404
Event::Telemetry(event) =>
403405
match event.record {
404-
TelemetryRecord::PlatformStart { request_id, .. } => {
406+
TelemetryRecord::PlatformInitStart { .. } => {
405407
let mut p = invocation_processor.lock().await;
406-
p.on_platform_start(request_id, event.time);
408+
p.on_platform_init_start(event.time);
407409
drop(p);
408410
}
409411
TelemetryRecord::PlatformInitReport {
@@ -416,6 +418,11 @@ async fn extension_loop_active(
416418
p.on_platform_init_report(metrics.duration_ms);
417419
drop(p);
418420
}
421+
TelemetryRecord::PlatformStart { request_id, .. } => {
422+
let mut p = invocation_processor.lock().await;
423+
p.on_platform_start(request_id, event.time);
424+
drop(p);
425+
}
419426
TelemetryRecord::PlatformRuntimeDone {
420427
request_id,
421428
status,

bottlecap/src/config/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod processing_rule;
44
pub mod trace_propagation_style;
55

66
use std::path::Path;
7+
use std::time::Instant;
78
use std::vec;
89

910
use figment::providers::{Format, Yaml};
@@ -226,6 +227,7 @@ pub struct AwsConfig {
226227
pub aws_secret_access_key: String,
227228
pub aws_session_token: String,
228229
pub function_name: String,
230+
pub sandbox_init_time: Instant,
229231
}
230232

231233
#[cfg(test)]

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ impl ContextBuffer {
165165
pub fn size(&self) -> usize {
166166
self.buffer.len()
167167
}
168+
169+
/// Returns if the buffer is empty.
170+
///
171+
#[must_use]
172+
pub fn is_empty(&self) -> bool {
173+
self.buffer.is_empty()
174+
}
168175
}
169176

170177
#[cfg(test)]

bottlecap/src/lifecycle/invocation/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use base64::{engine::general_purpose, DecodeError, Engine};
22
use datadog_trace_protobuf::pb::Span;
3+
use rand::{rngs::OsRng, Rng, RngCore};
4+
5+
use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE};
36
use serde_json::Value;
47
use tracing::debug;
58

@@ -27,6 +30,25 @@ pub fn base64_to_string(base64_string: &str) -> Result<String, DecodeError> {
2730
}
2831
}
2932

33+
fn create_empty_span(name: String, resource: String, service: String) -> Span {
34+
Span {
35+
name,
36+
resource,
37+
service,
38+
r#type: String::from("serverless"),
39+
..Default::default()
40+
}
41+
}
42+
43+
fn generate_span_id() -> u64 {
44+
if std::env::var(INIT_TYPE).map_or(false, |it| it == SNAP_START_VALUE) {
45+
return OsRng.next_u64();
46+
}
47+
48+
let mut rng = rand::thread_rng();
49+
rng.gen()
50+
}
51+
3052
pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) {
3153
// Null scenario
3254
if value.is_null() {

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 112 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashMap,
33
sync::{Arc, Mutex},
4-
time::{SystemTime, UNIX_EPOCH},
4+
time::{Instant, SystemTime, UNIX_EPOCH},
55
};
66

77
use chrono::{DateTime, Utc};
@@ -15,7 +15,8 @@ use tracing::debug;
1515
use crate::{
1616
config::{self, AwsConfig},
1717
lifecycle::invocation::{
18-
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value,
18+
base64_to_string, context::ContextBuffer, create_empty_span, generate_span_id,
19+
span_inferrer::SpanInferrer, tag_span_from_value,
1920
},
2021
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
2122
proc::{self, CPUData, NetworkData},
@@ -36,21 +37,31 @@ use crate::{
3637

3738
pub const MS_TO_NS: f64 = 1_000_000.0;
3839
pub const S_TO_NS: f64 = 1_000_000_000.0;
40+
pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS: u64 = 10_000;
3941

4042
pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY: &str = "x-datadog-invocation-error-msg";
4143
pub const DATADOG_INVOCATION_ERROR_TYPE_KEY: &str = "x-datadog-invocation-error-type";
4244
pub const DATADOG_INVOCATION_ERROR_STACK_KEY: &str = "x-datadog-invocation-error-stack";
4345
pub const DATADOG_INVOCATION_ERROR_KEY: &str = "x-datadog-invocation-error";
4446

4547
pub struct Processor {
48+
// Buffer containing context of the previous 5 invocations
4649
pub context_buffer: ContextBuffer,
50+
// Helper to infer span information
4751
inferrer: SpanInferrer,
52+
// Current invocation span
4853
pub span: Span,
54+
// Cold start span
55+
cold_start_span: Option<Span>,
56+
// Extracted span context from inferred span, headers, or payload
4957
pub extracted_span_context: Option<SpanContext>,
5058
// Used to extract the trace context from inferred span, headers, or payload
5159
propagator: DatadogCompositePropagator,
60+
// Helper to send enhanced metrics
5261
enhanced_metrics: EnhancedMetrics,
62+
// AWS configuration from the Lambda environment
5363
aws_config: AwsConfig,
64+
// Flag to determine if a tracer was detected
5465
tracer_detected: bool,
5566
config: Arc<config::Config>,
5667
}
@@ -63,32 +74,18 @@ impl Processor {
6374
aws_config: &AwsConfig,
6475
metrics_aggregator: Arc<Mutex<MetricsAggregator>>,
6576
) -> Self {
66-
let service = config.service.clone().unwrap_or("aws.lambda".to_string());
77+
let service = config.service.clone().unwrap_or(String::from("aws.lambda"));
6778
let resource = tags_provider
6879
.get_canonical_resource_name()
69-
.unwrap_or("aws_lambda".to_string());
80+
.unwrap_or(String::from("aws.lambda"));
7081

7182
let propagator = DatadogCompositePropagator::new(Arc::clone(&config));
7283

7384
Processor {
7485
context_buffer: ContextBuffer::default(),
7586
inferrer: SpanInferrer::default(),
76-
span: Span {
77-
service,
78-
name: "aws.lambda".to_string(),
79-
resource,
80-
trace_id: 0,
81-
span_id: 0,
82-
parent_id: 0,
83-
start: 0,
84-
duration: 0,
85-
error: 0,
86-
meta: HashMap::new(),
87-
metrics: HashMap::new(),
88-
r#type: "serverless".to_string(),
89-
meta_struct: HashMap::new(),
90-
span_links: Vec::new(),
91-
},
87+
span: create_empty_span(String::from("aws.lambda"), resource, service),
88+
cold_start_span: None,
9289
extracted_span_context: None,
9390
propagator,
9491
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
@@ -101,6 +98,9 @@ impl Processor {
10198
/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
10299
///
103100
pub fn on_invoke_event(&mut self, request_id: String) {
101+
self.reset_state();
102+
self.set_init_tags();
103+
104104
self.context_buffer.create_context(request_id.clone());
105105
if self.config.enhanced_metrics {
106106
// Collect offsets for network and cpu metrics
@@ -132,10 +132,87 @@ impl Processor {
132132
self.enhanced_metrics.increment_invocation_metric();
133133
}
134134

135+
/// Resets the state of the processor to default values.
136+
///
137+
fn reset_state(&mut self) {
138+
// Reset Span Context on Span
139+
self.span.trace_id = 0;
140+
self.span.parent_id = 0;
141+
self.span.span_id = 0;
142+
// Error
143+
self.span.error = 0;
144+
// Meta tags
145+
self.span.meta.clear();
146+
// Extracted Span Context
147+
self.extracted_span_context = None;
148+
// Cold Start Span
149+
self.cold_start_span = None;
150+
}
151+
152+
/// On the first invocation, determine if it's a cold start or proactive init.
153+
///
154+
/// For every other invocation, it will always be warm start.
155+
///
156+
fn set_init_tags(&mut self) {
157+
let mut proactive_initialization = false;
158+
let mut cold_start = false;
159+
160+
// If it's empty, then we are in a cold start
161+
if self.context_buffer.is_empty() {
162+
let now = Instant::now();
163+
let time_since_sandbox_init = now.duration_since(self.aws_config.sandbox_init_time);
164+
if time_since_sandbox_init.as_millis() > PROACTIVE_INITIALIZATION_THRESHOLD_MS.into() {
165+
proactive_initialization = true;
166+
} else {
167+
cold_start = true;
168+
}
169+
}
170+
171+
if proactive_initialization {
172+
self.span.meta.insert(
173+
String::from("proactive_initialization"),
174+
proactive_initialization.to_string(),
175+
);
176+
}
177+
self.span
178+
.meta
179+
.insert(String::from("cold_start"), cold_start.to_string());
180+
181+
self.enhanced_metrics
182+
.set_init_tags(proactive_initialization, cold_start);
183+
}
184+
185+
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>) {
186+
// Create a cold start span
187+
let mut cold_start_span = create_empty_span(
188+
String::from("aws.lambda.cold_start"),
189+
self.span.resource.clone(),
190+
self.span.service.clone(),
191+
);
192+
193+
let start_time: i64 = SystemTime::from(time)
194+
.duration_since(UNIX_EPOCH)
195+
.expect("time went backwards")
196+
.as_nanos()
197+
.try_into()
198+
.unwrap_or_default();
199+
200+
cold_start_span.span_id = generate_span_id();
201+
cold_start_span.start = start_time;
202+
203+
self.cold_start_span = Some(cold_start_span);
204+
}
205+
135206
/// Given the duration of the platform init report, set the init duration metric.
136207
///
208+
#[allow(clippy::cast_possible_truncation)]
137209
pub fn on_platform_init_report(&mut self, duration_ms: f64) {
138210
self.enhanced_metrics.set_init_duration_metric(duration_ms);
211+
212+
if let Some(cold_start_span) = &mut self.cold_start_span {
213+
// `round` is intentionally meant to be a whole integer
214+
cold_start_span.duration = (duration_ms * MS_TO_NS) as i64;
215+
}
139216
}
140217

141218
/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
@@ -183,10 +260,10 @@ impl Processor {
183260
}
184261

185262
if let Some(context) = self.context_buffer.get(request_id) {
186-
let span = &mut self.span;
187263
// `round` is intentionally meant to be a whole integer
188-
span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
189-
span.meta
264+
self.span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64;
265+
self.span
266+
.meta
190267
.insert("request_id".to_string(), request_id.clone());
191268
// todo(duncanista): add missing tags
192269
// - cold start, proactive init
@@ -213,6 +290,11 @@ impl Processor {
213290

214291
self.inferrer.complete_inferred_spans(&self.span);
215292

293+
if let Some(cold_start_span) = &mut self.cold_start_span {
294+
cold_start_span.trace_id = self.span.trace_id;
295+
cold_start_span.parent_id = self.span.parent_id;
296+
}
297+
216298
if self.tracer_detected {
217299
let mut body_size = std::mem::size_of_val(&self.span);
218300
let mut traces = vec![self.span.clone()];
@@ -227,6 +309,11 @@ impl Processor {
227309
traces.push(ws.clone());
228310
}
229311

312+
if let Some(cold_start_span) = &self.cold_start_span {
313+
body_size += std::mem::size_of_val(cold_start_span);
314+
traces.push(cold_start_span.clone());
315+
}
316+
230317
// todo: figure out what to do here
231318
let header_tags = tracer_header_tags::TracerHeaderTags {
232319
lang: "",
@@ -263,7 +350,7 @@ impl Processor {
263350
// Set the report log metrics
264351
self.enhanced_metrics.set_report_log_metrics(&metrics);
265352

266-
if let Some(context) = self.context_buffer.remove(request_id) {
353+
if let Some(context) = self.context_buffer.get(request_id) {
267354
if context.runtime_duration_ms != 0.0 {
268355
let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms;
269356

@@ -273,7 +360,7 @@ impl Processor {
273360
}
274361

275362
// Set Network and CPU time metrics
276-
if let Some(offsets) = context.enhanced_metric_data {
363+
if let Some(offsets) = context.enhanced_metric_data.clone() {
277364
self.enhanced_metrics
278365
.set_network_enhanced_metrics(offsets.network_offset);
279366
self.enhanced_metrics
@@ -288,14 +375,6 @@ impl Processor {
288375
pub fn on_invocation_start(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
289376
self.tracer_detected = true;
290377

291-
// Reset trace context
292-
self.span.trace_id = 0;
293-
self.span.parent_id = 0;
294-
self.span.span_id = 0;
295-
self.span.error = 0;
296-
self.span.meta.clear();
297-
self.extracted_span_context = None;
298-
299378
let payload_value = match serde_json::from_slice::<Value>(&payload) {
300379
Ok(value) => value,
301380
Err(_) => json!({}),

0 commit comments

Comments
 (0)