diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 62f6063a7..6b3033f26 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -15,8 +15,7 @@ use bottlecap::{ event_bus::bus::EventBus, events::Event, lifecycle::{ - flush_control::FlushControl, - invocation_context::{InvocationContext, InvocationContextBuffer}, + flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor, listener::Listener as LifecycleListener, }, logger, @@ -280,6 +279,11 @@ async fn extension_loop_active( let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { buffer: Arc::new(TokioMutex::new(Vec::new())), }); + + let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(config), + ))); let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new( obfuscation_config::ObfuscationConfig::new() @@ -298,23 +302,28 @@ async fn extension_loop_active( let trace_flusher_clone = trace_flusher.clone(); let stats_flusher_clone = stats_flusher.clone(); - let trace_agent = Box::new(trace_agent::TraceAgent { - config: Arc::clone(config), - trace_processor, - trace_flusher: trace_flusher_clone, - stats_processor, - stats_flusher: stats_flusher_clone, - tags_provider: Arc::clone(&tags_provider), - }); + let trace_agent = Box::new( + trace_agent::TraceAgent::new( + Arc::clone(config), + trace_processor.clone(), + trace_flusher_clone, + stats_processor, + stats_flusher_clone, + Arc::clone(&tags_provider), + ) + .await, + ); + let trace_agent_tx = trace_agent.get_sender_copy(); + tokio::spawn(async move { - let res = trace_agent.start_trace_agent().await; + let res = trace_agent.start().await; if let Err(e) = res { error!("Error starting trace agent: {e:?}"); } }); let lifecycle_listener = LifecycleListener { - tags_provider: Arc::clone(&tags_provider), + invocation_processor: Arc::clone(&invocation_processor), }; // TODO(astuyve): deprioritize this task after the first request tokio::spawn(async move { @@ -332,7 +341,6 @@ async fn extension_loop_active( setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; let flush_control = FlushControl::new(config.serverless_flush_strategy); - let mut invocation_context_buffer = InvocationContextBuffer::default(); let mut shutdown = false; let mut flush_interval = flush_control.get_flush_interval(); @@ -375,92 +383,95 @@ async fn extension_loop_active( Event::Metric(event) => { debug!("Metric event: {:?}", event); } - Event::Telemetry(event) => match event.record { - TelemetryRecord::PlatformStart { request_id, .. } => { - invocation_context_buffer.insert(InvocationContext { - request_id, - runtime_duration_ms: 0.0, - }); - } - TelemetryRecord::PlatformInitReport { - initialization_type, - phase, - metrics, - } => { - debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics); - lambda_enhanced_metrics - .set_init_duration_metric(metrics.duration_ms); - } - TelemetryRecord::PlatformRuntimeDone { - request_id, - status, - metrics, - .. - } => { - if let Some(metrics) = metrics { - invocation_context_buffer - .add_runtime_duration(&request_id, metrics.duration_ms); + Event::Telemetry(event) => + match event.record { + TelemetryRecord::PlatformStart { request_id, .. } => { + let mut p = invocation_processor.lock().await; + p.on_platform_start(request_id, event.time); + drop(p); + } + TelemetryRecord::PlatformInitReport { + initialization_type, + phase, + metrics, + } => { + debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics); lambda_enhanced_metrics - .set_runtime_duration_metric(metrics.duration_ms); + .set_init_duration_metric(metrics.duration_ms); } + TelemetryRecord::PlatformRuntimeDone { + request_id, + status, + metrics, + .. + } => { + let mut p = invocation_processor.lock().await; + if let Some(metrics) = metrics { + p.on_platform_runtime_done( + &request_id, + metrics.duration_ms, + config.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_tx.clone() + ).await; + lambda_enhanced_metrics + .set_runtime_duration_metric(metrics.duration_ms); + } + drop(p); - if status != Status::Success { - lambda_enhanced_metrics.increment_errors_metric(); - if status == Status::Timeout { - lambda_enhanced_metrics.increment_timeout_metric(); + if status != Status::Success { + lambda_enhanced_metrics.increment_errors_metric(); + if status == Status::Timeout { + lambda_enhanced_metrics.increment_timeout_metric(); + } } - } - debug!( - "Runtime done for request_id: {:?} with status: {:?}", - request_id, status - ); - // TODO(astuyve) it'll be easy to - // pass the invocation deadline to - // flush tasks here, so they can - // retry if we have more time - if flush_control.should_flush_end() { - tokio::join!( - logs_flusher.flush(), - metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() + debug!( + "Runtime done for request_id: {:?} with status: {:?}", + request_id, status ); + + // TODO(astuyve) it'll be easy to + // pass the invocation deadline to + // flush tasks here, so they can + // retry if we have more time + if flush_control.should_flush_end() { + tokio::join!( + logs_flusher.flush(), + metrics_flusher.flush(), + trace_flusher.manual_flush(), + stats_flusher.manual_flush() + ); + } + break; } - break; - } - TelemetryRecord::PlatformReport { - request_id, - status, - metrics, - .. - } => { - debug!( - "Platform report for request_id: {:?} with status: {:?}", - request_id, status - ); - lambda_enhanced_metrics.set_report_log_metrics(&metrics); - if let Some(invocation_context) = - invocation_context_buffer.remove(&request_id) - { - if invocation_context.runtime_duration_ms > 0.0 { - let post_runtime_duration_ms = metrics.duration_ms - - invocation_context.runtime_duration_ms; + TelemetryRecord::PlatformReport { + request_id, + status, + metrics, + .. + } => { + debug!( + "Platform report for request_id: {:?} with status: {:?}", + request_id, status + ); + lambda_enhanced_metrics.set_report_log_metrics(&metrics); + let mut p = invocation_processor.lock().await; + if let Some(post_runtime_duration_ms) = p.on_platform_report(&request_id, metrics.duration_ms) { lambda_enhanced_metrics.set_post_runtime_duration_metric( post_runtime_duration_ms, ); - } else { - debug!("Impossible to compute post runtime duration for request_id: {:?}", request_id); } - } + drop(p); - if shutdown { - break; + if shutdown { + break; + } + } + _ => { + debug!("Unforwarded Telemetry event: {:?}", event); } } - _ => { - debug!("Unforwarded Telemetry event: {:?}", event); - } - }, } } _ = flush_interval.tick() => { diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs new file mode 100644 index 000000000..a1c74bc57 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -0,0 +1,298 @@ +use std::collections::VecDeque; + +use tracing::debug; + +#[derive(Debug, Clone, PartialEq)] +pub struct Context { + pub request_id: String, + pub runtime_duration_ms: f64, + pub init_duration_ms: f64, + pub start_time: i64, +} + +impl Context { + #[must_use] + pub fn new( + request_id: String, + runtime_duration_ms: f64, + init_duration_ms: f64, + start_time: i64, + ) -> Self { + Context { + request_id, + runtime_duration_ms, + init_duration_ms, + start_time, + } + } +} + +#[allow(clippy::module_name_repetitions)] +pub struct ContextBuffer { + buffer: VecDeque, +} + +impl Default for ContextBuffer { + /// Creates a new `ContextBuffer` with a default capacity of 5. + /// + fn default() -> Self { + ContextBuffer { + buffer: VecDeque::::with_capacity(5), + } + } +} + +impl ContextBuffer { + #[allow(dead_code)] + fn with_capacity(capacity: usize) -> Self { + ContextBuffer { + buffer: VecDeque::::with_capacity(capacity), + } + } + + /// Inserts a context into the buffer. If the buffer is full, the oldest `Context` is removed. + /// + fn insert(&mut self, context: Context) { + if self.size() == self.buffer.capacity() { + self.buffer.pop_front(); + self.buffer.push_back(context); + } else { + if self.get(&context.request_id).is_some() { + self.remove(&context.request_id); + } + + self.buffer.push_back(context); + } + } + + /// Removes a context from the buffer. Returns the removed `Context` if found. + /// + pub fn remove(&mut self, request_id: &String) -> Option { + if let Some(i) = self + .buffer + .iter() + .position(|context| context.request_id == *request_id) + { + return self.buffer.remove(i); + } + debug!("Context for request_id: {:?} not found", request_id); + + None + } + + /// Returns a reference to a `Context` from the buffer if found. + /// + #[must_use] + pub fn get(&self, request_id: &String) -> Option<&Context> { + self.buffer + .iter() + .find(|context| context.request_id == *request_id) + } + + /// Adds the init duration to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// + pub fn add_init_duration(&mut self, request_id: &String, init_duration_ms: f64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.init_duration_ms = init_duration_ms; + } else { + self.insert(Context::new(request_id.clone(), 0.0, init_duration_ms, 0)); + } + } + + /// Adds the start time to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// + pub fn add_start_time(&mut self, request_id: &String, start_time: i64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.start_time = start_time; + } else { + self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time)); + } + } + + /// Adds the runtime duration to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// + pub fn add_runtime_duration(&mut self, request_id: &String, runtime_duration_ms: f64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.runtime_duration_ms = runtime_duration_ms; + } else { + self.insert(Context::new( + request_id.clone(), + runtime_duration_ms, + 0.0, + 0, + )); + } + } + + /// Returns the size of the buffer. + /// + #[must_use] + pub fn size(&self) -> usize { + self.buffer.len() + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn test_insert() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // This should replace the first context + let request_id_3 = String::from("3"); + let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_3).unwrap(), &context); + + // First context should be None + assert!(buffer.get(&request_id).is_none()); + } + + #[test] + fn test_remove() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // Remove the first context + assert_eq!(buffer.remove(&request_id).unwrap().request_id, request_id); + // Size is reduced by 1 + assert_eq!(buffer.size(), 1); + assert!(buffer.get(&request_id).is_none()); + + // Remove a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + assert!(buffer.remove(&unexistent_request_id).is_none()); + } + + #[test] + fn test_get() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // Get a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + assert!(buffer.get(&unexistent_request_id).is_none()); + } + + #[test] + fn test_add_init_duration() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_init_duration(&request_id, 100.0); + assert_eq!(buffer.get(&request_id).unwrap().init_duration_ms, 100.0); + + // Add init duration to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_init_duration(&unexistent_request_id, 200.0); + assert_eq!(buffer.size(), 2); + assert_eq!( + buffer.get(&unexistent_request_id).unwrap().init_duration_ms, + 200.0 + ); + } + + #[test] + fn test_add_start_time() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_start_time(&request_id, 100); + assert_eq!(buffer.get(&request_id).unwrap().start_time, 100); + + // Add start time to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_start_time(&unexistent_request_id, 200); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&unexistent_request_id).unwrap().start_time, 200); + } + + #[test] + fn test_add_runtime_duration() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_runtime_duration(&request_id, 100.0); + assert_eq!(buffer.get(&request_id).unwrap().runtime_duration_ms, 100.0); + + // Add runtime duration to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_runtime_duration(&unexistent_request_id, 200.0); + assert_eq!(buffer.size(), 2); + assert_eq!( + buffer + .get(&unexistent_request_id) + .unwrap() + .runtime_duration_ms, + 200.0 + ); + } +} diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs new file mode 100644 index 000000000..bf32ed105 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -0,0 +1,2 @@ +pub mod context; +pub mod processor; diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs new file mode 100644 index 000000000..e553150d1 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -0,0 +1,164 @@ +use std::{ + collections::HashMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use chrono::{DateTime, Utc}; +use datadog_trace_protobuf::pb::Span; +use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; +use tokio::sync::mpsc::Sender; +use tracing::debug; + +use crate::{ + config, lifecycle::invocation::context::ContextBuffer, tags::provider, traces::trace_processor, +}; + +pub const MS_TO_NS: f64 = 1_000_000.0; + +pub struct Processor { + pub context_buffer: ContextBuffer, + pub span: Span, + tracer_detected: bool, +} + +impl Processor { + #[must_use] + pub fn new(tags_provider: Arc, config: Arc) -> Self { + let service = config.service.clone().unwrap_or("aws.lambda".to_string()); + let resource = tags_provider + .get_canonical_resource_name() + .unwrap_or("aws_lambda".to_string()); + + Processor { + context_buffer: ContextBuffer::default(), + span: Span { + service, + name: "aws.lambda".to_string(), + resource, + trace_id: 0, // set later + span_id: 0, // maybe set later? + parent_id: 0, // set later + start: 0, // set later + duration: 0, // set later + error: 0, + meta: HashMap::new(), + metrics: HashMap::new(), + r#type: "serverless".to_string(), + meta_struct: HashMap::new(), + span_links: Vec::new(), + }, + tracer_detected: false, + } + } + + /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. + /// + /// Also, set the start time of the current span. + /// + pub fn on_platform_start(&mut self, request_id: String, time: DateTime) { + let start_time: i64 = SystemTime::from(time) + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() + .try_into() + .unwrap_or_default(); + self.context_buffer.add_start_time(&request_id, start_time); + self.span.start = start_time; + } + + #[allow(clippy::cast_possible_truncation)] + pub async fn on_platform_runtime_done( + &mut self, + request_id: &String, + duration_ms: f64, + config: Arc, + tags_provider: Arc, + trace_processor: Arc, + trace_agent_tx: Sender, + ) { + self.context_buffer + .add_runtime_duration(request_id, duration_ms); + + if let Some(context) = self.context_buffer.get(request_id) { + let span = &mut self.span; + // `round` is intentionally meant to be a whole integer + span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; + span.meta + .insert("request_id".to_string(), request_id.clone()); + + // todo(duncanista): add missing tags + // - cold start, proactive init + // - language + // - function.request - capture lambda payload + // - function.response + // - error.msg + // - error.type + // - error.stack + // - trigger tags (from inferred spans) + // - metrics tags (for asm) + } + + if self.tracer_detected { + let span_size = std::mem::size_of_val(&self.span); + + // todo: figure out what to do here + let header_tags = tracer_header_tags::TracerHeaderTags { + lang: "", + lang_version: "", + lang_interpreter: "", + lang_vendor: "", + tracer_version: "", + container_id: "", + client_computed_top_level: false, + client_computed_stats: false, + }; + + let send_data = trace_processor.process_traces( + config.clone(), + tags_provider.clone(), + header_tags, + vec![vec![self.span.clone()]], + span_size, + ); + + if let Err(e) = trace_agent_tx.send(send_data).await { + debug!("Failed to send invocation span to agent: {e}"); + } + } + } + + /// Given a `request_id` and the duration in milliseconds of the platform report, + /// calculate the duration of the runtime if the `request_id` is found in the context buffer. + /// + /// If the `request_id` is not found in the context buffer, return `None`. + /// If the `runtime_duration_ms` hasn't been seen, return `None`. + /// + pub fn on_platform_report(&mut self, request_id: &String, duration_ms: f64) -> Option { + if let Some(context) = self.context_buffer.remove(request_id) { + if context.runtime_duration_ms == 0.0 { + return None; + } + + return Some(duration_ms - context.runtime_duration_ms); + } + + None + } + + /// If this method is called, it means that we are operating in a Universally Instrumented + /// runtime. Therefore, we need to set the `tracer_detected` flag to `true`. + /// + pub fn on_invocation_start(&mut self) { + self.tracer_detected = true; + } + + /// Given trace context information, set it to the current span. + /// + pub fn on_invocation_end(&mut self, trace_id: u64, span_id: u64, parent_id: u64) { + let span = &mut self.span; + span.trace_id = trace_id; + span.span_id = span_id; + span.parent_id = parent_id; + } +} diff --git a/bottlecap/src/lifecycle/invocation_context.rs b/bottlecap/src/lifecycle/invocation_context.rs deleted file mode 100644 index 24e8e4541..000000000 --- a/bottlecap/src/lifecycle/invocation_context.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::collections::VecDeque; - -use tracing::debug; - -#[derive(Debug, Clone)] -pub struct InvocationContext { - pub request_id: String, - pub runtime_duration_ms: f64, -} - -#[allow(clippy::module_name_repetitions)] -pub struct InvocationContextBuffer { - buffer: VecDeque, -} - -impl Default for InvocationContextBuffer { - fn default() -> Self { - InvocationContextBuffer { - buffer: VecDeque::::with_capacity(5), - } - } -} - -impl InvocationContextBuffer { - pub fn insert(&mut self, invocation_context: InvocationContext) { - if self.buffer.len() == self.buffer.capacity() { - self.buffer.pop_front(); - self.buffer.push_back(invocation_context); - } else { - if self.get(&invocation_context.request_id).is_some() { - self.remove(&invocation_context.request_id); - } - - self.buffer.push_back(invocation_context); - } - } - - pub fn remove(&mut self, request_id: &String) -> Option { - if let Some(i) = self - .buffer - .iter() - .position(|context| context.request_id == *request_id) - { - return self.buffer.remove(i); - } - debug!("Context for request_id: {:?} not found", request_id); - - None - } - - #[must_use] - pub fn get(&self, request_id: &String) -> Option<&InvocationContext> { - self.buffer - .iter() - .find(|context| context.request_id == *request_id) - } - - pub fn add_runtime_duration(&mut self, request_id: &String, runtime_duration_ms: f64) { - if let Some(context) = self - .buffer - .iter_mut() - .find(|context| context.request_id == *request_id) - { - context.runtime_duration_ms = runtime_duration_ms; - } else { - self.insert(InvocationContext { - request_id: request_id.to_string(), - runtime_duration_ms, - }); - } - } -} diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index 0afdab41c..ebb3dd833 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -8,9 +8,10 @@ use std::sync::Arc; use hyper::service::{make_service_fn, service_fn}; use hyper::{http, Body, Method, Request, Response, StatusCode}; use serde_json::json; -use tracing::{error, warn}; +use tokio::sync::Mutex; +use tracing::{debug, error, warn}; -use crate::tags::provider; +use crate::lifecycle::invocation::processor::Processor as InvocationProcessor; const HELLO_PATH: &str = "/lambda/hello"; const START_INVOCATION_PATH: &str = "/lambda/start-invocation"; @@ -18,13 +19,17 @@ const END_INVOCATION_PATH: &str = "/lambda/end-invocation"; const AGENT_PORT: usize = 8124; pub struct Listener { - pub tags_provider: Arc, + pub invocation_processor: Arc>, } impl Listener { pub async fn start(&self) -> Result<(), Box> { + let invocation_processor = self.invocation_processor.clone(); + let make_svc = make_service_fn(move |_| { - let service = service_fn(Self::handler); + let invocation_processor = invocation_processor.clone(); + + let service = service_fn(move |req| Self::handler(req, invocation_processor.clone())); async move { Ok::<_, Infallible>(service) } }); @@ -44,11 +49,26 @@ impl Listener { Ok(()) } - #[allow(clippy::unused_async)] - async fn handler(req: Request) -> http::Result> { + async fn handler( + req: Request, + invocation_processor: Arc>, + ) -> http::Result> { match (req.method(), req.uri().path()) { - (&Method::POST, START_INVOCATION_PATH) => Self::start_invocation_handler(req), - (&Method::POST, END_INVOCATION_PATH) => Self::end_invocation_handler(req), + (&Method::POST, START_INVOCATION_PATH) => { + Self::start_invocation_handler(req, invocation_processor).await + } + (&Method::POST, END_INVOCATION_PATH) => { + match Self::end_invocation_handler(req, invocation_processor).await { + Ok(response) => Ok(response), + Err(e) => { + error!("Failed to end invocation {e}"); + Ok(Response::builder() + .status(500) + .body(Body::empty()) + .expect("no body")) + } + } + } (&Method::GET, HELLO_PATH) => Self::hello_handler(), _ => { let mut not_found = Response::default(); @@ -58,13 +78,54 @@ impl Listener { } } - fn start_invocation_handler(_: Request) -> http::Result> { + async fn start_invocation_handler( + _: Request, + invocation_processor: Arc>, + ) -> http::Result> { + debug!("Received start invocation request"); + let mut processor = invocation_processor.lock().await; + processor.on_invocation_start(); + drop(processor); + Response::builder() .status(200) .body(Body::from(json!({}).to_string())) } - fn end_invocation_handler(_: Request) -> http::Result> { + async fn end_invocation_handler( + req: Request, + invocation_processor: Arc>, + ) -> http::Result> { + debug!("Received end invocation request"); + let (parts, _) = req.into_parts(); + let headers = parts.headers; + + let mut processor = invocation_processor.lock().await; + + let mut trace_id = 0; + if let Some(header) = headers.get("x-datadog-trace-id") { + if let Ok(header_value) = header.to_str() { + trace_id = header_value.parse::().unwrap_or(0); + } + } + + let mut span_id = 0; + if let Some(header) = headers.get("x-datadog-span-id") { + if let Ok(header_value) = header.to_str() { + span_id = header_value.parse::().unwrap_or(0); + } + } + + let mut parent_id = 0; + if let Some(header) = headers.get("x-datadog-parent-id") { + if let Ok(header_value) = header.to_str() { + parent_id = header_value.parse::().unwrap_or(0); + } + } + + processor.on_invocation_end(trace_id, span_id, parent_id); + drop(processor); + Response::builder() .status(200) .body(Body::from(json!({}).to_string())) diff --git a/bottlecap/src/lifecycle/mod.rs b/bottlecap/src/lifecycle/mod.rs index a35e20622..a0b3eda68 100644 --- a/bottlecap/src/lifecycle/mod.rs +++ b/bottlecap/src/lifecycle/mod.rs @@ -1,3 +1,3 @@ pub mod flush_control; -pub mod invocation_context; +pub mod invocation; pub mod listener; diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 955872d8f..d3b519682 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -6,7 +6,7 @@ use tracing::error; use crate::config; use crate::events::Event; -use crate::lifecycle::invocation_context::InvocationContext; +use crate::lifecycle::invocation::context::Context as InvocationContext; use crate::logs::aggregator::Aggregator; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; @@ -53,10 +53,7 @@ impl LambdaProcessor { service, tags, rules, - invocation_context: InvocationContext { - request_id: String::new(), - runtime_duration_ms: 0.0, - }, + invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0), orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 5011b7d64..3c2d34e0a 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -3,7 +3,7 @@ pub const BASE_LAMBDA_INVOCATION_PRICE: f64 = 0.000_000_2; pub const X86_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_016_666_7; pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4; pub const MS_TO_SEC: f64 = 0.001; -pub const MB_TO_GB: f64 = 1024.0; +pub const MB_TO_GB: f64 = 1_024.0; // Enhanced metrics pub const MAX_MEMORY_USED_METRIC: &str = "aws.lambda.enhanced.max_memory_used"; diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index caf0e3f89..7e6ce54e0 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -239,6 +239,11 @@ impl Lambda { self.tags_map.get(FUNCTION_ARN_KEY) } + #[must_use] + pub fn get_function_name(&self) -> Option<&String> { + self.tags_map.get(FUNCTION_NAME_KEY) + } + #[must_use] pub fn get_tags_map(&self) -> &hash_map::HashMap { &self.tags_map diff --git a/bottlecap/src/tags/provider.rs b/bottlecap/src/tags/provider.rs index a3a6881df..b6e775ac2 100644 --- a/bottlecap/src/tags/provider.rs +++ b/bottlecap/src/tags/provider.rs @@ -47,6 +47,11 @@ impl Provider { self.tag_provider.get_canonical_id() } + #[must_use] + pub fn get_canonical_resource_name(&self) -> Option { + self.tag_provider.get_canonical_resource_name() + } + #[must_use] pub fn get_tags_map(&self) -> &hash_map::HashMap { self.tag_provider.get_tags_map() @@ -56,6 +61,7 @@ impl Provider { trait GetTags { fn get_tags_vec(&self) -> Vec; fn get_canonical_id(&self) -> Option; + fn get_canonical_resource_name(&self) -> Option; fn get_tags_map(&self) -> &hash_map::HashMap; } @@ -72,6 +78,12 @@ impl GetTags for TagProvider { } } + fn get_canonical_resource_name(&self) -> Option { + match self { + TagProvider::Lambda(lambda_tags) => lambda_tags.get_function_name().cloned(), + } + } + fn get_tags_map(&self) -> &hash_map::HashMap { match self { TagProvider::Lambda(lambda_tags) => lambda_tags.get_tags_map(), diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 13ad6a321..57e6fd4c6 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -34,6 +34,7 @@ pub struct TraceAgent { pub stats_processor: Arc, pub stats_flusher: Arc, pub tags_provider: Arc, + tx: Sender, } #[derive(Clone, Copy)] @@ -43,9 +44,15 @@ pub enum ApiVersion { } impl TraceAgent { - pub async fn start_trace_agent(&self) -> Result<(), Box> { - let now = Instant::now(); - + #[must_use] + pub async fn new( + config: Arc, + trace_processor: Arc, + trace_flusher: Arc, + stats_processor: Arc, + stats_flusher: Arc, + tags_provider: Arc, + ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. @@ -54,9 +61,24 @@ impl TraceAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. - let trace_flusher = self.trace_flusher.clone(); + let trace_flusher = trace_flusher.clone(); trace_flusher.start_trace_flusher(trace_rx).await; + TraceAgent { + config, + trace_processor, + trace_flusher, + stats_processor, + stats_flusher, + tags_provider, + tx: trace_tx, + } + } + + pub async fn start(&self) -> Result<(), Box> { + let now = Instant::now(); + let trace_tx = self.tx.clone(); + // channels to send processed stats to our stats flusher. let (stats_tx, stats_rx): ( Sender, @@ -267,4 +289,9 @@ impl TraceAgent { .status(200) .body(Body::from(response_json.to_string())) } + + #[must_use] + pub fn get_sender_copy(&self) -> Sender { + self.tx.clone() + } }