Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 14 additions & 36 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ use bottlecap::{
agent::LogsAgent,
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
},
metrics::enhanced::lambda::Lambda as enhanced_metrics,
secrets::decrypt,
tags::{lambda, provider::Provider as TagProvider},
telemetry::{
self,
client::TelemetryApiClient,
events::{Status, TelemetryEvent, TelemetryRecord},
events::{TelemetryEvent, TelemetryRecord},
listener::TelemetryListener,
},
traces::{
Expand Down Expand Up @@ -295,10 +294,12 @@ async fn extension_loop_active(
buffer: Arc::new(TokioMutex::new(Vec::new())),
});

// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
aws_config,
Arc::clone(&metrics_aggr),
)));
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
Expand Down Expand Up @@ -349,8 +350,6 @@ async fn extension_loop_active(
}
});

let lambda_enhanced_metrics =
enhanced_metrics::new(Arc::clone(&metrics_aggr), Arc::clone(config));
let dogstatsd_cancel_token = start_dogstatsd(&metrics_aggr).await;

let telemetry_listener_cancel_token =
Expand All @@ -374,7 +373,6 @@ async fn extension_loop_active(
"Invoke event {}; deadline: {}, invoked_function_arn: {}",
request_id, deadline_ms, invoked_function_arn
);
lambda_enhanced_metrics.increment_invocation_metric();
let mut p = invocation_processor.lock().await;
p.on_invoke_event(request_id);
drop(p);
Expand Down Expand Up @@ -415,47 +413,35 @@ async fn extension_loop_active(
metrics,
} => {
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
lambda_enhanced_metrics
.set_init_duration_metric(metrics.duration_ms);
let mut p = invocation_processor.lock().await;
p.on_platform_init_report(metrics.duration_ms);
drop(p);
}
TelemetryRecord::PlatformRuntimeDone {
request_id,
status,
metrics,
..
} => {
debug!(
"Runtime done for request_id: {:?} with status: {:?}",
request_id, status
);

let mut p = invocation_processor.lock().await;
let mut enhanced_metric_data = None;
if let Some(metrics) = metrics {
enhanced_metric_data = p.on_platform_runtime_done(
p.on_platform_runtime_done(
&request_id,
metrics.duration_ms,
status,
config.clone(),
tags_provider.clone(),
trace_processor.clone(),
trace_agent_tx.clone()
).await;
lambda_enhanced_metrics
.set_runtime_duration_metric(metrics.duration_ms);
}
drop(p);

if status != Status::Success {
lambda_enhanced_metrics.increment_errors_metric();
if status == Status::Timeout {
lambda_enhanced_metrics.increment_timeout_metric();
}
}
debug!(
"Runtime done for request_id: {:?} with status: {:?}",
request_id, status
);

// set cpu utilization metrics here to avoid accounting for extra idle time
if let Some(offsets) = enhanced_metric_data {
lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(offsets.cpu_offset, offsets.uptime_offset);
}

// TODO(astuyve) it'll be easy to
// pass the invocation deadline to
// flush tasks here, so they can
Expand All @@ -481,16 +467,8 @@ async fn extension_loop_active(
"Platform report for request_id: {:?} with status: {:?}",
request_id, status
);
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
let mut p = invocation_processor.lock().await;
let (post_runtime_duration_ms, enhanced_metric_data) = p.on_platform_report(&request_id, metrics.duration_ms);
if let Some(duration) = post_runtime_duration_ms {
lambda_enhanced_metrics.set_post_runtime_duration_metric(duration);
}
if let Some(offsets) = enhanced_metric_data {
lambda_enhanced_metrics.set_network_enhanced_metrics(offsets.network_offset);
lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(offsets.cpu_offset);
}
p.on_platform_report(&request_id, metrics);
drop(p);

if shutdown {
Expand Down
79 changes: 58 additions & 21 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use std::{
collections::HashMap,
sync::Arc,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
};

use chrono::{DateTime, Utc};
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::{send_data::SendData, tracer_header_tags};
use dogstatsd::aggregator::Aggregator as MetricsAggregator;
use serde_json::{json, Value};
use tokio::sync::mpsc::Sender;
use tracing::debug;

use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer},
metrics::enhanced::lambda::EnhancedMetricData,
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{self, CPUData, NetworkData},
tags::provider,
telemetry::events::{ReportMetrics, Status},
traces::{
context::SpanContext,
propagation::{DatadogCompositePropagator, Propagator},
Expand All @@ -34,9 +36,10 @@ pub struct Processor {
pub extracted_span_context: Option<SpanContext>,
// Used to extract the trace context from inferred span, headers, or payload
propagator: DatadogCompositePropagator,
enhanced_metrics: EnhancedMetrics,
aws_config: AwsConfig,
tracer_detected: bool,
collect_enhanced_data: bool,
enhanced_metrics_enabled: bool,
}

impl Processor {
Expand All @@ -45,6 +48,7 @@ impl Processor {
tags_provider: Arc<provider::Provider>,
config: Arc<config::Config>,
aws_config: &AwsConfig,
metrics_aggregator: Arc<Mutex<MetricsAggregator>>,
) -> Self {
let service = config.service.clone().unwrap_or("aws.lambda".to_string());
let resource = tags_provider
Expand Down Expand Up @@ -74,17 +78,18 @@ impl Processor {
},
extracted_span_context: None,
propagator,
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
aws_config: aws_config.clone(),
tracer_detected: false,
collect_enhanced_data: config.enhanced_metrics,
enhanced_metrics_enabled: config.enhanced_metrics,
}
}

/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
self.context_buffer.create_context(request_id.clone());
if self.collect_enhanced_data {
if self.enhanced_metrics_enabled {
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
let uptime_offset: Option<f64> = proc::get_uptime().ok();
Expand All @@ -96,6 +101,15 @@ impl Processor {
self.context_buffer
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
}

// Increment the invocation metric
self.enhanced_metrics.increment_invocation_metric();
}

/// Given the duration of the platform init report, set the init duration metric.
///
pub fn on_platform_init_report(&mut self, duration_ms: f64) {
self.enhanced_metrics.set_init_duration_metric(duration_ms);
}

/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
Expand All @@ -113,20 +127,35 @@ impl Processor {
self.span.start = start_time;
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::cast_possible_truncation)]
pub async fn on_platform_runtime_done(
&mut self,
request_id: &String,
duration_ms: f64,
status: Status,
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_agent_tx: Sender<SendData>,
) -> Option<EnhancedMetricData> {
) {
self.context_buffer
.add_runtime_duration(request_id, duration_ms);

let mut enhanced_metric_data: Option<EnhancedMetricData> = None;
// Set the runtime duration metric
self.enhanced_metrics
.set_runtime_duration_metric(duration_ms);

if status != Status::Success {
// Increment the error metric
self.enhanced_metrics.increment_errors_metric();

// Increment the error type metric
if status == Status::Timeout {
self.enhanced_metrics.increment_timeout_metric();
}
}

if let Some(context) = self.context_buffer.get(request_id) {
let span = &mut self.span;
// `round` is intentionally meant to be a whole integer
Expand All @@ -143,7 +172,12 @@ impl Processor {
// - error.stack
// - metrics tags (for asm)

enhanced_metric_data.clone_from(&context.enhanced_metric_data);
if let Some(offsets) = &context.enhanced_metric_data {
self.enhanced_metrics.set_cpu_utilization_enhanced_metrics(
offsets.cpu_offset.clone(),
offsets.uptime_offset,
);
}
}

if let Some(trigger_tags) = self.inferrer.get_trigger_tags() {
Expand Down Expand Up @@ -190,8 +224,6 @@ impl Processor {
debug!("Failed to send invocation span to agent: {e}");
}
}

enhanced_metric_data
}

/// Given a `request_id` and the duration in milliseconds of the platform report,
Expand All @@ -200,22 +232,27 @@ impl Processor {
/// If the `request_id` is not found in the context buffer, return `None`.
/// If the `runtime_duration_ms` hasn't been seen, return `None`.
///
pub fn on_platform_report(
&mut self,
request_id: &String,
duration_ms: f64,
) -> (Option<f64>, Option<EnhancedMetricData>) {
if let Some(context) = self.context_buffer.remove(request_id) {
let mut post_runtime_duration_ms: Option<f64> = None;
pub fn on_platform_report(&mut self, request_id: &String, metrics: ReportMetrics) {
// Set the report log metrics
self.enhanced_metrics.set_report_log_metrics(&metrics);

if let Some(context) = self.context_buffer.remove(request_id) {
if context.runtime_duration_ms != 0.0 {
post_runtime_duration_ms = Some(duration_ms - context.runtime_duration_ms);
let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms;

// Set the post runtime duration metric
self.enhanced_metrics
.set_post_runtime_duration_metric(post_runtime_duration_ms);
}

return (post_runtime_duration_ms, context.enhanced_metric_data);
// Set Network and CPU time metrics
if let Some(offsets) = context.enhanced_metric_data {
self.enhanced_metrics
.set_network_enhanced_metrics(offsets.network_offset);
self.enhanced_metrics
.set_cpu_time_enhanced_metrics(offsets.cpu_offset);
}
}

(None, None)
}

/// If this method is called, it means that we are operating in a Universally Instrumented
Expand Down