diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index dcddf23b5..30c2c714b 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -23,13 +23,12 @@ use bottlecap::{ agent::LogsAgent, flusher::{build_fqdn_logs, Flusher as LogsFlusher}, }, - metrics::enhanced::lambda::Lambda as enhanced_metrics, secrets::decrypt, tags::{lambda, provider::Provider as TagProvider}, telemetry::{ self, client::TelemetryApiClient, - events::{Status, TelemetryEvent, TelemetryRecord}, + events::{TelemetryEvent, TelemetryRecord}, listener::TelemetryListener, }, traces::{ @@ -295,10 +294,12 @@ async fn extension_loop_active( buffer: Arc::new(TokioMutex::new(Vec::new())), }); + // Lifecycle Invocation Processor let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( Arc::clone(&tags_provider), Arc::clone(config), aws_config, + Arc::clone(&metrics_aggr), ))); let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new( @@ -349,8 +350,6 @@ async fn extension_loop_active( } }); - let lambda_enhanced_metrics = - enhanced_metrics::new(Arc::clone(&metrics_aggr), Arc::clone(config)); let dogstatsd_cancel_token = start_dogstatsd(&metrics_aggr).await; let telemetry_listener_cancel_token = @@ -374,7 +373,6 @@ async fn extension_loop_active( "Invoke event {}; deadline: {}, invoked_function_arn: {}", request_id, deadline_ms, invoked_function_arn ); - lambda_enhanced_metrics.increment_invocation_metric(); let mut p = invocation_processor.lock().await; p.on_invoke_event(request_id); drop(p); @@ -415,8 +413,9 @@ async fn extension_loop_active( metrics, } => { debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics); - lambda_enhanced_metrics - .set_init_duration_metric(metrics.duration_ms); + let mut p = invocation_processor.lock().await; + p.on_platform_init_report(metrics.duration_ms); + drop(p); } TelemetryRecord::PlatformRuntimeDone { request_id, @@ -424,38 +423,25 @@ async fn extension_loop_active( metrics, .. } => { + debug!( + "Runtime done for request_id: {:?} with status: {:?}", + request_id, status + ); + let mut p = invocation_processor.lock().await; - let mut enhanced_metric_data = None; if let Some(metrics) = metrics { - enhanced_metric_data = p.on_platform_runtime_done( + p.on_platform_runtime_done( &request_id, metrics.duration_ms, + status, config.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_tx.clone() ).await; - lambda_enhanced_metrics - .set_runtime_duration_metric(metrics.duration_ms); } drop(p); - if status != Status::Success { - lambda_enhanced_metrics.increment_errors_metric(); - if status == Status::Timeout { - lambda_enhanced_metrics.increment_timeout_metric(); - } - } - debug!( - "Runtime done for request_id: {:?} with status: {:?}", - request_id, status - ); - - // set cpu utilization metrics here to avoid accounting for extra idle time - if let Some(offsets) = enhanced_metric_data { - lambda_enhanced_metrics.set_cpu_utilization_enhanced_metrics(offsets.cpu_offset, offsets.uptime_offset); - } - // TODO(astuyve) it'll be easy to // pass the invocation deadline to // flush tasks here, so they can @@ -481,16 +467,8 @@ async fn extension_loop_active( "Platform report for request_id: {:?} with status: {:?}", request_id, status ); - lambda_enhanced_metrics.set_report_log_metrics(&metrics); let mut p = invocation_processor.lock().await; - let (post_runtime_duration_ms, enhanced_metric_data) = p.on_platform_report(&request_id, metrics.duration_ms); - if let Some(duration) = post_runtime_duration_ms { - lambda_enhanced_metrics.set_post_runtime_duration_metric(duration); - } - if let Some(offsets) = enhanced_metric_data { - lambda_enhanced_metrics.set_network_enhanced_metrics(offsets.network_offset); - lambda_enhanced_metrics.set_cpu_time_enhanced_metrics(offsets.cpu_offset); - } + p.on_platform_report(&request_id, metrics); drop(p); if shutdown { diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 471c00f69..467519e37 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1,12 +1,13 @@ use std::{ collections::HashMap, - sync::Arc, + sync::{Arc, Mutex}, time::{SystemTime, UNIX_EPOCH}, }; use chrono::{DateTime, Utc}; use datadog_trace_protobuf::pb::Span; use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; +use dogstatsd::aggregator::Aggregator as MetricsAggregator; use serde_json::{json, Value}; use tokio::sync::mpsc::Sender; use tracing::debug; @@ -14,9 +15,10 @@ use tracing::debug; use crate::{ config::{self, AwsConfig}, lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer}, - metrics::enhanced::lambda::EnhancedMetricData, + metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{self, CPUData, NetworkData}, tags::provider, + telemetry::events::{ReportMetrics, Status}, traces::{ context::SpanContext, propagation::{DatadogCompositePropagator, Propagator}, @@ -34,9 +36,10 @@ pub struct Processor { pub extracted_span_context: Option, // Used to extract the trace context from inferred span, headers, or payload propagator: DatadogCompositePropagator, + enhanced_metrics: EnhancedMetrics, aws_config: AwsConfig, tracer_detected: bool, - collect_enhanced_data: bool, + enhanced_metrics_enabled: bool, } impl Processor { @@ -45,6 +48,7 @@ impl Processor { tags_provider: Arc, config: Arc, aws_config: &AwsConfig, + metrics_aggregator: Arc>, ) -> Self { let service = config.service.clone().unwrap_or("aws.lambda".to_string()); let resource = tags_provider @@ -74,9 +78,10 @@ impl Processor { }, extracted_span_context: None, propagator, + enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)), aws_config: aws_config.clone(), tracer_detected: false, - collect_enhanced_data: config.enhanced_metrics, + enhanced_metrics_enabled: config.enhanced_metrics, } } @@ -84,7 +89,7 @@ impl Processor { /// pub fn on_invoke_event(&mut self, request_id: String) { self.context_buffer.create_context(request_id.clone()); - if self.collect_enhanced_data { + if self.enhanced_metrics_enabled { let network_offset: Option = proc::get_network_data().ok(); let cpu_offset: Option = proc::get_cpu_data().ok(); let uptime_offset: Option = proc::get_uptime().ok(); @@ -96,6 +101,15 @@ impl Processor { self.context_buffer .add_enhanced_metric_data(&request_id, enhanced_metric_offsets); } + + // Increment the invocation metric + self.enhanced_metrics.increment_invocation_metric(); + } + + /// Given the duration of the platform init report, set the init duration metric. + /// + pub fn on_platform_init_report(&mut self, duration_ms: f64) { + self.enhanced_metrics.set_init_duration_metric(duration_ms); } /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. @@ -113,20 +127,35 @@ impl Processor { self.span.start = start_time; } + #[allow(clippy::too_many_arguments)] #[allow(clippy::cast_possible_truncation)] pub async fn on_platform_runtime_done( &mut self, request_id: &String, duration_ms: f64, + status: Status, config: Arc, tags_provider: Arc, trace_processor: Arc, trace_agent_tx: Sender, - ) -> Option { + ) { self.context_buffer .add_runtime_duration(request_id, duration_ms); - let mut enhanced_metric_data: Option = None; + // Set the runtime duration metric + self.enhanced_metrics + .set_runtime_duration_metric(duration_ms); + + if status != Status::Success { + // Increment the error metric + self.enhanced_metrics.increment_errors_metric(); + + // Increment the error type metric + if status == Status::Timeout { + self.enhanced_metrics.increment_timeout_metric(); + } + } + if let Some(context) = self.context_buffer.get(request_id) { let span = &mut self.span; // `round` is intentionally meant to be a whole integer @@ -143,7 +172,12 @@ impl Processor { // - error.stack // - metrics tags (for asm) - enhanced_metric_data.clone_from(&context.enhanced_metric_data); + if let Some(offsets) = &context.enhanced_metric_data { + self.enhanced_metrics.set_cpu_utilization_enhanced_metrics( + offsets.cpu_offset.clone(), + offsets.uptime_offset, + ); + } } if let Some(trigger_tags) = self.inferrer.get_trigger_tags() { @@ -190,8 +224,6 @@ impl Processor { debug!("Failed to send invocation span to agent: {e}"); } } - - enhanced_metric_data } /// Given a `request_id` and the duration in milliseconds of the platform report, @@ -200,22 +232,27 @@ impl Processor { /// If the `request_id` is not found in the context buffer, return `None`. /// If the `runtime_duration_ms` hasn't been seen, return `None`. /// - pub fn on_platform_report( - &mut self, - request_id: &String, - duration_ms: f64, - ) -> (Option, Option) { - if let Some(context) = self.context_buffer.remove(request_id) { - let mut post_runtime_duration_ms: Option = None; + pub fn on_platform_report(&mut self, request_id: &String, metrics: ReportMetrics) { + // Set the report log metrics + self.enhanced_metrics.set_report_log_metrics(&metrics); + if let Some(context) = self.context_buffer.remove(request_id) { if context.runtime_duration_ms != 0.0 { - post_runtime_duration_ms = Some(duration_ms - context.runtime_duration_ms); + let post_runtime_duration_ms = metrics.duration_ms - context.runtime_duration_ms; + + // Set the post runtime duration metric + self.enhanced_metrics + .set_post_runtime_duration_metric(post_runtime_duration_ms); } - return (post_runtime_duration_ms, context.enhanced_metric_data); + // Set Network and CPU time metrics + if let Some(offsets) = context.enhanced_metric_data { + self.enhanced_metrics + .set_network_enhanced_metrics(offsets.network_offset); + self.enhanced_metrics + .set_cpu_time_enhanced_metrics(offsets.cpu_offset); + } } - - (None, None) } /// If this method is called, it means that we are operating in a Universally Instrumented