From 0d11207f983985233e65925ce2921af2dab19259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:05:46 -0400 Subject: [PATCH 01/24] decouple `hyper` from `trace_processor` --- bottlecap/src/traces/trace_processor.rs | 162 +++++++----------------- 1 file changed, 45 insertions(+), 117 deletions(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 695718e77..a19d61f20 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -5,37 +5,19 @@ use crate::tags::provider; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; use datadog_trace_utils::config_utils::trace_intake_url; +use datadog_trace_utils::tracer_header_tags; use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TraceEncoding}; use ddcommon::Endpoint; use std::str::FromStr; use std::sync::Arc; -use async_trait::async_trait; -use hyper::{http, Body, Request, Response, StatusCode}; -use tokio::sync::mpsc::Sender; use tracing::debug; use crate::config; -use datadog_trace_mini_agent::http_utils::{self, log_and_create_http_response}; use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_utils::trace_utils::SendData; use datadog_trace_utils::trace_utils::{self}; -use super::trace_agent::{ApiVersion, MAX_CONTENT_LENGTH}; - -#[async_trait] -pub trait TraceProcessor { - /// Deserializes traces from a hyper request body and sends them through the provided tokio mpsc - /// Sender. - async fn process_traces( - &self, - config: Arc, - req: Request, - tx: Sender, - tags_provider: Arc, - version: ApiVersion, - ) -> http::Result>; -} #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { @@ -67,55 +49,31 @@ impl TraceChunkProcessor for ChunkProcessor { } } -#[async_trait] +#[allow(clippy::module_name_repetitions)] +pub trait TraceProcessor { + fn process_traces( + &self, + config: Arc, + tags_provider: Arc, + header_tags: tracer_header_tags::TracerHeaderTags, + traces: Vec>, + body_size: usize, + ) -> SendData; +} + impl TraceProcessor for ServerlessTraceProcessor { - async fn process_traces( + fn process_traces( &self, config: Arc, - req: Request, - tx: Sender, tags_provider: Arc, - version: ApiVersion, - ) -> http::Result> { + header_tags: tracer_header_tags::TracerHeaderTags, + traces: Vec>, + body_size: usize, + ) -> SendData { debug!("Received traces to process"); - let (parts, body) = req.into_parts(); - - if let Some(response) = http_utils::verify_request_content_length( - &parts.headers, - MAX_CONTENT_LENGTH, - "Error processing traces", - ) { - return response; - } - - let tracer_header_tags = (&parts.headers).into(); - - // deserialize traces from the request body, convert to protobuf structs (see trace-protobuf - // crate) - let (body_size, traces) = match version { - ApiVersion::V04 => match trace_utils::get_traces_from_request_body(body).await { - Ok(result) => result, - Err(err) => { - return log_and_create_http_response( - &format!("Error deserializing trace from request body: {err}"), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - }, - ApiVersion::V05 => match trace_utils::get_v05_traces_from_request_body(body).await { - Ok(result) => result, - Err(err) => { - return log_and_create_http_response( - &format!("Error deserializing trace from request body: {err}"), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - }, - }; - let payload = trace_utils::collect_trace_chunks( traces, - &tracer_header_tags, + &header_tags, &mut ChunkProcessor { obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), @@ -131,44 +89,26 @@ impl TraceProcessor for ServerlessTraceProcessor { test_token: None, }; - let send_data = SendData::new(body_size, payload, tracer_header_tags, &endpoint); - - // send trace payload to our trace flusher - match tx.send(send_data).await { - Ok(()) => { - return log_and_create_http_response( - "Successfully buffered traces to be flushed.", - StatusCode::ACCEPTED, - ); - } - Err(err) => { - return log_and_create_http_response( - &format!("Error sending traces to the trace flusher: {err}"), - StatusCode::INTERNAL_SERVER_ERROR, - ); - } - } + SendData::new(body_size, payload, header_tags, &endpoint) } } #[cfg(test)] mod tests { use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use hyper::Request; use serde_json::json; use std::{ collections::HashMap, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; - use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::config::Config; use crate::tags::provider::Provider; use crate::traces::trace_processor::{self, TraceProcessor}; use crate::LAMBDA_RUNTIME_SLUG; use datadog_trace_protobuf::pb; - use datadog_trace_utils::{trace_utils, tracer_payload::TracerPayloadCollection}; + use datadog_trace_utils::{tracer_header_tags, tracer_payload::TracerPayloadCollection}; fn get_current_timestamp_nanos() -> i64 { i64::try_from( @@ -270,25 +210,26 @@ mod tests { #[allow(clippy::unwrap_used)] #[cfg_attr(miri, ignore)] async fn test_process_trace() { - let (tx, mut rx): ( - Sender, - Receiver, - ) = mpsc::channel(1); - let start = get_current_timestamp_nanos(); let json_span = create_test_json_span(11, 222, 333, start); - let bytes = rmp_serde::to_vec(&vec![vec![json_span]]).expect("invalid json"); - let request = Request::builder() - .header("datadog-meta-tracer-version", "4.0.0") - .header("datadog-meta-lang", "nodejs") - .header("datadog-meta-lang-version", "v19.7.0") - .header("datadog-meta-lang-interpreter", "v8") - .header("datadog-container-id", "33") - .header("content-length", "100") - .body(hyper::body::Body::from(bytes)) - .expect("fail to build request"); + let json_bytes = serde_json::to_vec(&json_span).expect("invalid json span"); + let span: pb::Span = + rmp_serde::from_slice(&json_bytes).expect("couldnt convert to proto span"); + + let traces: Vec> = vec![vec![span]]; + + let header_tags = tracer_header_tags::TracerHeaderTags { + lang: "nodejs", + lang_version: "v19.7.0", + lang_interpreter: "v8", + lang_vendor: "vendor", + tracer_version: "4.0.0", + container_id: "33", + client_computed_top_level: false, + client_computed_stats: false, + }; let trace_processor = trace_processor::ServerlessTraceProcessor { resolved_api_key: "foo".to_string(), @@ -296,20 +237,8 @@ mod tests { }; let config = create_test_config(); let tags_provider = create_tags_provider(config.clone()); - let res = trace_processor - .process_traces( - config, - request, - tx, - tags_provider.clone(), - crate::traces::trace_agent::ApiVersion::V04, - ) - .await; - assert!(res.is_ok()); - - let tracer_payload = rx.recv().await; - - assert!(tracer_payload.is_some()); + let tracer_payload = + trace_processor.process_traces(config, tags_provider.clone(), header_tags, traces, 100); let expected_tracer_payload = pb::TracerPayload { container_id: "33".to_string(), @@ -330,13 +259,12 @@ mod tests { app_version: String::new(), }; - let received_payload = if let TracerPayloadCollection::V07(payload) = - tracer_payload.expect("no payload").get_payloads() - { - Some(payload[0].clone()) - } else { - None - }; + let received_payload = + if let TracerPayloadCollection::V07(payload) = tracer_payload.get_payloads() { + Some(payload[0].clone()) + } else { + None + }; assert_eq!( expected_tracer_payload, From be1a1aea0bf5e1217afc493244a088c9befa6af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:06:03 -0400 Subject: [PATCH 02/24] add `handle_traces` --- bottlecap/src/traces/trace_agent.rs | 123 ++++++++++++++++++++++------ 1 file changed, 97 insertions(+), 26 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 6a59f8a74..13ad6a321 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -14,9 +14,9 @@ use tracing::{debug, error}; use crate::config; use crate::tags::provider; use crate::traces::{stats_flusher, stats_processor, trace_flusher, trace_processor}; -use datadog_trace_mini_agent::http_utils::log_and_create_http_response; +use datadog_trace_mini_agent::http_utils::{self, log_and_create_http_response}; use datadog_trace_protobuf::pb; -use datadog_trace_utils::trace_utils::SendData; +use datadog_trace_utils::trace_utils::{self, SendData}; const TRACE_AGENT_PORT: usize = 8126; const V4_TRACE_ENDPOINT_PATH: &str = "/v0.4/traces"; @@ -133,30 +133,38 @@ impl TraceAgent { tags_provider: Arc, ) -> http::Result> { match (req.method(), req.uri().path()) { - (&Method::PUT | &Method::POST, V4_TRACE_ENDPOINT_PATH) => { - match trace_processor - .process_traces(config, req, trace_tx, tags_provider, ApiVersion::V04) - .await - { - Ok(result) => Ok(result), - Err(err) => log_and_create_http_response( - &format!("Error processing traces: {err}"), - StatusCode::INTERNAL_SERVER_ERROR, - ), - } - } - (&Method::PUT | &Method::POST, V5_TRACE_ENDPOINT_PATH) => { - match trace_processor - .process_traces(config, req, trace_tx, tags_provider, ApiVersion::V05) - .await - { - Ok(result) => Ok(result), - Err(err) => log_and_create_http_response( - &format!("Error processing traces: {err}"), - StatusCode::INTERNAL_SERVER_ERROR, - ), - } - } + (&Method::PUT | &Method::POST, V4_TRACE_ENDPOINT_PATH) => match Self::handle_traces( + config, + req, + trace_processor.clone(), + trace_tx, + tags_provider, + ApiVersion::V04, + ) + .await + { + Ok(result) => Ok(result), + Err(err) => log_and_create_http_response( + &format!("Error processing traces: {err}"), + StatusCode::INTERNAL_SERVER_ERROR, + ), + }, + (&Method::PUT | &Method::POST, V5_TRACE_ENDPOINT_PATH) => match Self::handle_traces( + config, + req, + trace_processor.clone(), + trace_tx, + tags_provider, + ApiVersion::V05, + ) + .await + { + Ok(result) => Ok(result), + Err(err) => log_and_create_http_response( + &format!("Error processing traces: {err}"), + StatusCode::INTERNAL_SERVER_ERROR, + ), + }, (&Method::PUT | &Method::POST, STATS_ENDPOINT_PATH) => { match stats_processor.process_stats(req, stats_tx).await { Ok(result) => Ok(result), @@ -181,6 +189,69 @@ impl TraceAgent { } } + async fn handle_traces( + config: Arc, + req: Request, + trace_processor: Arc, + trace_tx: Sender, + tags_provider: Arc, + version: ApiVersion, + ) -> http::Result> { + debug!("Received traces to process"); + let (parts, body) = req.into_parts(); + + if let Some(response) = http_utils::verify_request_content_length( + &parts.headers, + MAX_CONTENT_LENGTH, + "Error processing traces", + ) { + return response; + } + + let tracer_header_tags = (&parts.headers).into(); + + let (body_size, traces) = match version { + ApiVersion::V04 => match trace_utils::get_traces_from_request_body(body).await { + Ok(result) => result, + Err(err) => { + return log_and_create_http_response( + &format!("Error deserializing trace from request body: {err}"), + StatusCode::INTERNAL_SERVER_ERROR, + ); + } + }, + ApiVersion::V05 => match trace_utils::get_v05_traces_from_request_body(body).await { + Ok(result) => result, + Err(err) => { + return log_and_create_http_response( + &format!("Error deserializing trace from request body: {err}"), + StatusCode::INTERNAL_SERVER_ERROR, + ); + } + }, + }; + + let send_data = trace_processor.process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + ); + + // send trace payload to our trace flusher + match trace_tx.send(send_data).await { + Ok(()) => log_and_create_http_response( + "Successfully buffered traces to be flushed.", + StatusCode::ACCEPTED, + ), + Err(err) => log_and_create_http_response( + &format!("Error sending traces to the trace flusher: {err}"), + StatusCode::INTERNAL_SERVER_ERROR, + ), + } + } + fn info_handler() -> http::Result> { let response_json = json!( { From 85258010bf6b60d468b806673b5870e3df6d3af3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:32:55 -0400 Subject: [PATCH 03/24] fix tests --- bottlecap/src/traces/trace_processor.rs | 42 ++++--------------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index a19d61f20..acb773a49 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -123,7 +123,7 @@ mod tests { fn create_test_config() -> Arc { Arc::new(Config { service: Some("test-service".to_string()), - tags: Some("test:tag,env:test".to_string()), + tags: Some("test:tag,env:test-env".to_string()), ..Config::default() }) } @@ -179,46 +179,16 @@ mod tests { span } - fn create_test_json_span( - trace_id: u64, - span_id: u64, - parent_id: u64, - start: i64, - ) -> serde_json::Value { - json!( - { - "trace_id": trace_id, - "span_id": span_id, - "service": "test-service", - "name": "test_name", - "resource": "test-resource", - "parent_id": parent_id, - "start": start, - "duration": 5, - "error": 0, - "meta": { - "service": "test-service", - "env": "test-env", - "runtime-id": "test-runtime-id-value", - }, - "metrics": {}, - "meta_struct": {}, - } - ) - } #[tokio::test] #[allow(clippy::unwrap_used)] #[cfg_attr(miri, ignore)] async fn test_process_trace() { let start = get_current_timestamp_nanos(); - let json_span = create_test_json_span(11, 222, 333, start); - - let json_bytes = serde_json::to_vec(&json_span).expect("invalid json span"); - let span: pb::Span = - rmp_serde::from_slice(&json_bytes).expect("couldnt convert to proto span"); + let tags_provider = create_tags_provider(create_test_config()); + let span = create_test_span(11, 222, 333, start, true, tags_provider); - let traces: Vec> = vec![vec![span]]; + let traces: Vec> = vec![vec![span.clone()]]; let header_tags = tracer_header_tags::TracerHeaderTags { lang: "nodejs", @@ -248,8 +218,8 @@ mod tests { runtime_id: "test-runtime-id-value".to_string(), chunks: vec![pb::TraceChunk { priority: i32::from(i8::MIN), - origin: String::new(), - spans: vec![create_test_span(11, 222, 333, start, true, tags_provider)], + origin: "lambda".to_string(), + spans: vec![span.clone()], tags: HashMap::new(), dropped_trace: false, }], From 002e9eb1a7650618b6b12c49f398789ae33cfdac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:39:12 -0400 Subject: [PATCH 04/24] removed unused import --- bottlecap/src/traces/trace_processor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index acb773a49..26c1030d8 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -96,7 +96,6 @@ impl TraceProcessor for ServerlessTraceProcessor { #[cfg(test)] mod tests { use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig; - use serde_json::json; use std::{ collections::HashMap, sync::Arc, From c7de6522d9d54e0234b695de7eb47fc9c7c37cb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:09:27 -0400 Subject: [PATCH 05/24] move `invocation_context` to `invocation::context` module also added some more fields and refactored it --- bottlecap/src/lifecycle/invocation/context.rs | 117 ++++++++++++++++++ bottlecap/src/lifecycle/invocation/mod.rs | 2 + bottlecap/src/lifecycle/invocation_context.rs | 72 ----------- 3 files changed, 119 insertions(+), 72 deletions(-) create mode 100644 bottlecap/src/lifecycle/invocation/context.rs create mode 100644 bottlecap/src/lifecycle/invocation/mod.rs delete mode 100644 bottlecap/src/lifecycle/invocation_context.rs diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs new file mode 100644 index 000000000..fbd638e95 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -0,0 +1,117 @@ +use std::collections::VecDeque; + +use tracing::debug; + +#[derive(Debug, Clone)] +pub struct Context { + pub request_id: String, + pub runtime_duration_ms: f64, + pub init_duration_ms: f64, + pub start_time: i64, +} + +impl Context { + #[must_use] + pub fn new( + request_id: String, + runtime_duration_ms: f64, + init_duration_ms: f64, + start_time: i64, + ) -> Self { + Context { + request_id, + runtime_duration_ms, + init_duration_ms, + start_time, + } + } +} + +#[allow(clippy::module_name_repetitions)] +pub struct ContextBuffer { + buffer: VecDeque, +} + +impl Default for ContextBuffer { + fn default() -> Self { + ContextBuffer { + buffer: VecDeque::::with_capacity(5), + } + } +} + +impl ContextBuffer { + fn insert(&mut self, context: Context) { + if self.buffer.len() == self.buffer.capacity() { + self.buffer.pop_front(); + self.buffer.push_back(context); + } else { + if self.get(&context.request_id).is_some() { + self.remove(&context.request_id); + } + + self.buffer.push_back(context); + } + } + + pub fn remove(&mut self, request_id: &String) -> Option { + if let Some(i) = self + .buffer + .iter() + .position(|context| context.request_id == *request_id) + { + return self.buffer.remove(i); + } + debug!("Context for request_id: {:?} not found", request_id); + + None + } + + #[must_use] + pub fn get(&self, request_id: &String) -> Option<&Context> { + self.buffer + .iter() + .find(|context| context.request_id == *request_id) + } + + pub fn add_init_duration(&mut self, request_id: &String, init_duration_ms: f64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.init_duration_ms = init_duration_ms; + } else { + self.insert(Context::new(request_id.clone(), 0.0, init_duration_ms, 0)); + } + } + + pub fn add_start_time(&mut self, request_id: &String, start_time: i64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.start_time = start_time; + } else { + self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time)); + } + } + + pub fn add_runtime_duration(&mut self, request_id: &String, runtime_duration_ms: f64) { + if let Some(context) = self + .buffer + .iter_mut() + .find(|context| context.request_id == *request_id) + { + context.runtime_duration_ms = runtime_duration_ms; + } else { + self.insert(Context::new( + request_id.clone(), + runtime_duration_ms, + 0.0, + 0, + )); + } + } +} diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs new file mode 100644 index 000000000..bf32ed105 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -0,0 +1,2 @@ +pub mod context; +pub mod processor; diff --git a/bottlecap/src/lifecycle/invocation_context.rs b/bottlecap/src/lifecycle/invocation_context.rs deleted file mode 100644 index 24e8e4541..000000000 --- a/bottlecap/src/lifecycle/invocation_context.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::collections::VecDeque; - -use tracing::debug; - -#[derive(Debug, Clone)] -pub struct InvocationContext { - pub request_id: String, - pub runtime_duration_ms: f64, -} - -#[allow(clippy::module_name_repetitions)] -pub struct InvocationContextBuffer { - buffer: VecDeque, -} - -impl Default for InvocationContextBuffer { - fn default() -> Self { - InvocationContextBuffer { - buffer: VecDeque::::with_capacity(5), - } - } -} - -impl InvocationContextBuffer { - pub fn insert(&mut self, invocation_context: InvocationContext) { - if self.buffer.len() == self.buffer.capacity() { - self.buffer.pop_front(); - self.buffer.push_back(invocation_context); - } else { - if self.get(&invocation_context.request_id).is_some() { - self.remove(&invocation_context.request_id); - } - - self.buffer.push_back(invocation_context); - } - } - - pub fn remove(&mut self, request_id: &String) -> Option { - if let Some(i) = self - .buffer - .iter() - .position(|context| context.request_id == *request_id) - { - return self.buffer.remove(i); - } - debug!("Context for request_id: {:?} not found", request_id); - - None - } - - #[must_use] - pub fn get(&self, request_id: &String) -> Option<&InvocationContext> { - self.buffer - .iter() - .find(|context| context.request_id == *request_id) - } - - pub fn add_runtime_duration(&mut self, request_id: &String, runtime_duration_ms: f64) { - if let Some(context) = self - .buffer - .iter_mut() - .find(|context| context.request_id == *request_id) - { - context.runtime_duration_ms = runtime_duration_ms; - } else { - self.insert(InvocationContext { - request_id: request_id.to_string(), - runtime_duration_ms, - }); - } - } -} From 7e1be93462b5ac3a57d753bd78e57067a1a621c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:10:39 -0400 Subject: [PATCH 06/24] add `new` and `get_sender_copy` to `trace_agent` --- bottlecap/src/traces/trace_agent.rs | 35 +++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 13ad6a321..57e6fd4c6 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -34,6 +34,7 @@ pub struct TraceAgent { pub stats_processor: Arc, pub stats_flusher: Arc, pub tags_provider: Arc, + tx: Sender, } #[derive(Clone, Copy)] @@ -43,9 +44,15 @@ pub enum ApiVersion { } impl TraceAgent { - pub async fn start_trace_agent(&self) -> Result<(), Box> { - let now = Instant::now(); - + #[must_use] + pub async fn new( + config: Arc, + trace_processor: Arc, + trace_flusher: Arc, + stats_processor: Arc, + stats_flusher: Arc, + tags_provider: Arc, + ) -> TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. @@ -54,9 +61,24 @@ impl TraceAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. - let trace_flusher = self.trace_flusher.clone(); + let trace_flusher = trace_flusher.clone(); trace_flusher.start_trace_flusher(trace_rx).await; + TraceAgent { + config, + trace_processor, + trace_flusher, + stats_processor, + stats_flusher, + tags_provider, + tx: trace_tx, + } + } + + pub async fn start(&self) -> Result<(), Box> { + let now = Instant::now(); + let trace_tx = self.tx.clone(); + // channels to send processed stats to our stats flusher. let (stats_tx, stats_rx): ( Sender, @@ -267,4 +289,9 @@ impl TraceAgent { .status(200) .body(Body::from(response_json.to_string())) } + + #[must_use] + pub fn get_sender_copy(&self) -> Sender { + self.tx.clone() + } } From 11e6ba1332c5aed1cd2711579400bc2b1dffcd2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:10:59 -0400 Subject: [PATCH 07/24] add `get_canonical_resource_name` to `tags_provider` --- bottlecap/src/tags/provider.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bottlecap/src/tags/provider.rs b/bottlecap/src/tags/provider.rs index a3a6881df..b6e775ac2 100644 --- a/bottlecap/src/tags/provider.rs +++ b/bottlecap/src/tags/provider.rs @@ -47,6 +47,11 @@ impl Provider { self.tag_provider.get_canonical_id() } + #[must_use] + pub fn get_canonical_resource_name(&self) -> Option { + self.tag_provider.get_canonical_resource_name() + } + #[must_use] pub fn get_tags_map(&self) -> &hash_map::HashMap { self.tag_provider.get_tags_map() @@ -56,6 +61,7 @@ impl Provider { trait GetTags { fn get_tags_vec(&self) -> Vec; fn get_canonical_id(&self) -> Option; + fn get_canonical_resource_name(&self) -> Option; fn get_tags_map(&self) -> &hash_map::HashMap; } @@ -72,6 +78,12 @@ impl GetTags for TagProvider { } } + fn get_canonical_resource_name(&self) -> Option { + match self { + TagProvider::Lambda(lambda_tags) => lambda_tags.get_function_name().cloned(), + } + } + fn get_tags_map(&self) -> &hash_map::HashMap { match self { TagProvider::Lambda(lambda_tags) => lambda_tags.get_tags_map(), From 68d12acf766c7319b23fbd16e8dd46d941020351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:11:16 -0400 Subject: [PATCH 08/24] add `get_function_name` to `lambda::tags` --- bottlecap/src/tags/lambda/tags.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index 90a24ab3d..8038603ca 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -240,6 +240,11 @@ impl Lambda { self.tags_map.get(FUNCTION_ARN_KEY) } + #[must_use] + pub fn get_function_name(&self) -> Option<&String> { + self.tags_map.get(FUNCTION_NAME_KEY) + } + #[must_use] pub fn get_tags_map(&self) -> &hash_map::HashMap { &self.tags_map From 5070e1922c8bef29a86e59abe67984623577f449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:11:44 -0400 Subject: [PATCH 09/24] add `MS_TO_NS` constant --- bottlecap/src/metrics/enhanced/constants.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 5011b7d64..39ff08d10 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -3,7 +3,8 @@ pub const BASE_LAMBDA_INVOCATION_PRICE: f64 = 0.000_000_2; pub const X86_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_016_666_7; pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4; pub const MS_TO_SEC: f64 = 0.001; -pub const MB_TO_GB: f64 = 1024.0; +pub const MS_TO_NS: f64 = 1_000_000.0; +pub const MB_TO_GB: f64 = 1_024.0; // Enhanced metrics pub const MAX_MEMORY_USED_METRIC: &str = "aws.lambda.enhanced.max_memory_used"; From 2d4d60c2e6c7103e4a4114db93e172cbd417e929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:12:19 -0400 Subject: [PATCH 10/24] add `invocation::processor` --- .../src/lifecycle/invocation/processor.rs | 143 ++++++++++++++++++ bottlecap/src/lifecycle/mod.rs | 2 +- 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 bottlecap/src/lifecycle/invocation/processor.rs diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs new file mode 100644 index 000000000..5f0f3ef85 --- /dev/null +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -0,0 +1,143 @@ +use std::{ + collections::HashMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use chrono::{DateTime, Utc}; +use datadog_trace_protobuf::pb::Span; +use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; +use tokio::sync::mpsc::Sender; +use tracing::debug; + +use crate::{ + config, lifecycle::invocation::context::ContextBuffer, metrics::enhanced::constants::MS_TO_NS, + tags::provider, traces::trace_processor, +}; + +pub struct Processor { + pub context_buffer: ContextBuffer, + pub current_request_id: String, + pub span: Span, + lambda_library_detected: bool, +} + +impl Processor { + #[must_use] + pub fn new(tags_provider: Arc, config: Arc) -> Self { + let service = config.service.clone().unwrap_or("aws.lambda".to_string()); + let resource = tags_provider + .get_canonical_resource_name() + .unwrap_or("aws_lambda".to_string()); + + Processor { + context_buffer: ContextBuffer::default(), + current_request_id: String::new(), + span: Span { + service, + name: "aws.lambda".to_string(), + resource, + trace_id: 0, // set later + span_id: 0, // maybe set later? + parent_id: 0, // set later + start: 0, // set later + duration: 0, // set later + error: 0, + meta: HashMap::new(), + metrics: HashMap::new(), + r#type: "serverless".to_string(), + meta_struct: HashMap::new(), + span_links: Vec::new(), + }, + lambda_library_detected: true, + } + } + + pub fn on_platform_start(&mut self, request_id: String, time: DateTime) { + self.current_request_id.clone_from(&request_id); + let start_time: i64 = SystemTime::from(time) + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() + .try_into() + .unwrap_or_default(); + self.context_buffer.add_start_time(&request_id, start_time); + self.span.start = start_time; + } + + #[allow(clippy::cast_possible_truncation)] + pub async fn on_platform_runtime_done( + &mut self, + request_id: &String, + duration_ms: f64, + config: Arc, + tags_provider: Arc, + trace_processor: Arc, + trace_agent_tx: Sender, + ) { + self.context_buffer + .add_runtime_duration(request_id, duration_ms); + + if let Some(context) = self.context_buffer.get(&self.current_request_id) { + let span = &mut self.span; + // `round` is intentionally meant to be a whole integer + span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; + span.meta + .insert("request_id".to_string(), request_id.clone()); + } + + if !self.lambda_library_detected { + let span_size = std::mem::size_of_val(&self.span); + let header_tags = tracer_header_tags::TracerHeaderTags { + lang: "", + lang_version: "", + lang_interpreter: "", + lang_vendor: "", + tracer_version: "", + container_id: "", + client_computed_top_level: false, + client_computed_stats: false, + }; + + let send_data = trace_processor.process_traces( + config.clone(), + tags_provider.clone(), + header_tags, + vec![vec![self.span.clone()]], + span_size, + ); + + match trace_agent_tx.send(send_data).await { + Ok(()) => println!("[usm] Sent span to agent"), + Err(e) => println!("[usm] Failed to send span to agent: {e}"), + } + } + } + + pub fn on_platform_report(&mut self, request_id: &String, duration_ms: f64) -> Option { + debug!("[usm] on_platform_report"); + if let Some(context) = self.context_buffer.remove(request_id) { + if context.runtime_duration_ms == 0.0 { + return None; + } + + return Some(duration_ms - context.runtime_duration_ms); + } + + None + } + + pub fn on_invocation_start(&mut self) { + debug!("[usm] on_invocation_start"); + self.lambda_library_detected = false; + } + + pub fn on_invocation_end(&mut self, trace_id: u64, span_id: u64, parent_id: u64) { + debug!("[usm] on_invocation_end"); + + let span = &mut self.span; + span.trace_id = trace_id; + span.span_id = span_id; + span.parent_id = parent_id; + } +} diff --git a/bottlecap/src/lifecycle/mod.rs b/bottlecap/src/lifecycle/mod.rs index a35e20622..a0b3eda68 100644 --- a/bottlecap/src/lifecycle/mod.rs +++ b/bottlecap/src/lifecycle/mod.rs @@ -1,3 +1,3 @@ pub mod flush_control; -pub mod invocation_context; +pub mod invocation; pub mod listener; From 70896935042144731be92ba4efe2fdd7dd8fd971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:12:37 -0400 Subject: [PATCH 11/24] update use of `invocation::context` --- bottlecap/src/logs/lambda/processor.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 955872d8f..d3b519682 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -6,7 +6,7 @@ use tracing::error; use crate::config; use crate::events::Event; -use crate::lifecycle::invocation_context::InvocationContext; +use crate::lifecycle::invocation::context::Context as InvocationContext; use crate::logs::aggregator::Aggregator; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; @@ -53,10 +53,7 @@ impl LambdaProcessor { service, tags, rules, - invocation_context: InvocationContext { - request_id: String::new(), - runtime_duration_ms: 0.0, - }, + invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0), orphan_logs: Vec::new(), ready_logs: Vec::new(), event_bus, From 929741c769d5644da4d342dead0917a82a22c767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:12:59 -0400 Subject: [PATCH 12/24] make `lifecycle::listener` to use `invocation::processor` --- bottlecap/src/lifecycle/listener.rs | 64 +++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index 0afdab41c..e57455dd8 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -8,9 +8,10 @@ use std::sync::Arc; use hyper::service::{make_service_fn, service_fn}; use hyper::{http, Body, Method, Request, Response, StatusCode}; use serde_json::json; +use tokio::sync::Mutex; use tracing::{error, warn}; -use crate::tags::provider; +use crate::lifecycle::invocation::processor::Processor as InvocationProcessor; const HELLO_PATH: &str = "/lambda/hello"; const START_INVOCATION_PATH: &str = "/lambda/start-invocation"; @@ -18,13 +19,17 @@ const END_INVOCATION_PATH: &str = "/lambda/end-invocation"; const AGENT_PORT: usize = 8124; pub struct Listener { - pub tags_provider: Arc, + pub invocation_processor: Arc>, } impl Listener { pub async fn start(&self) -> Result<(), Box> { + let invocation_processor = self.invocation_processor.clone(); + let make_svc = make_service_fn(move |_| { - let service = service_fn(Self::handler); + let invocation_processor = invocation_processor.clone(); + + let service = service_fn(move |req| Self::handler(req, invocation_processor.clone())); async move { Ok::<_, Infallible>(service) } }); @@ -44,11 +49,24 @@ impl Listener { Ok(()) } - #[allow(clippy::unused_async)] - async fn handler(req: Request) -> http::Result> { + async fn handler( + req: Request, + invocation_processor: Arc>, + ) -> http::Result> { match (req.method(), req.uri().path()) { (&Method::POST, START_INVOCATION_PATH) => Self::start_invocation_handler(req), - (&Method::POST, END_INVOCATION_PATH) => Self::end_invocation_handler(req), + (&Method::POST, END_INVOCATION_PATH) => { + match Self::end_invocation_handler(req, invocation_processor).await { + Ok(response) => Ok(response), + Err(e) => { + error!("Failed to end invocation {e}"); + Ok(Response::builder() + .status(500) + .body(Body::empty()) + .expect("no body")) + } + } + } (&Method::GET, HELLO_PATH) => Self::hello_handler(), _ => { let mut not_found = Response::default(); @@ -64,7 +82,39 @@ impl Listener { .body(Body::from(json!({}).to_string())) } - fn end_invocation_handler(_: Request) -> http::Result> { + async fn end_invocation_handler( + req: Request, + invocation_processor: Arc>, + ) -> http::Result> { + let (parts, _) = req.into_parts(); + let headers = parts.headers; + + let mut processor = invocation_processor.lock().await; + + let mut trace_id = 0; + if let Some(header) = headers.get("x-datadog-trace-id") { + if let Ok(header_value) = header.to_str() { + trace_id = header_value.parse::().unwrap_or(0); + } + } + + let mut span_id = 0; + if let Some(header) = headers.get("x-datadog-span-id") { + if let Ok(header_value) = header.to_str() { + span_id = header_value.parse::().unwrap_or(0); + } + } + + let mut parent_id = 0; + if let Some(header) = headers.get("x-datadog-parent-id") { + if let Ok(header_value) = header.to_str() { + parent_id = header_value.parse::().unwrap_or(0); + } + } + + processor.on_invocation_end(trace_id, span_id, parent_id); + drop(processor); + Response::builder() .status(200) .body(Body::from(json!({}).to_string())) From dcba48e4528ba605288a6877d6aca72a0644f18f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:16:20 -0400 Subject: [PATCH 13/24] use `invocation::processor` in `main.rs` --- bottlecap/src/bin/bottlecap/main.rs | 181 +++++++++++++++------------- 1 file changed, 96 insertions(+), 85 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 62f6063a7..9121ae582 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -15,8 +15,7 @@ use bottlecap::{ event_bus::bus::EventBus, events::Event, lifecycle::{ - flush_control::FlushControl, - invocation_context::{InvocationContext, InvocationContextBuffer}, + flush_control::FlushControl, invocation::processor::Processor as InvocationProcessor, listener::Listener as LifecycleListener, }, logger, @@ -280,6 +279,11 @@ async fn extension_loop_active( let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher { buffer: Arc::new(TokioMutex::new(Vec::new())), }); + + let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new( + Arc::clone(&tags_provider), + Arc::clone(config), + ))); let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { obfuscation_config: Arc::new( obfuscation_config::ObfuscationConfig::new() @@ -298,23 +302,26 @@ async fn extension_loop_active( let trace_flusher_clone = trace_flusher.clone(); let stats_flusher_clone = stats_flusher.clone(); - let trace_agent = Box::new(trace_agent::TraceAgent { - config: Arc::clone(config), - trace_processor, - trace_flusher: trace_flusher_clone, + let trace_agent = trace_agent::TraceAgent::new( + Arc::clone(config), + trace_processor.clone(), + trace_flusher_clone, stats_processor, - stats_flusher: stats_flusher_clone, - tags_provider: Arc::clone(&tags_provider), - }); + stats_flusher_clone, + Arc::clone(&tags_provider), + ) + .await; + let trace_agent_tx = trace_agent.get_sender_copy(); + tokio::spawn(async move { - let res = trace_agent.start_trace_agent().await; + let res = trace_agent.start().await; if let Err(e) = res { error!("Error starting trace agent: {e:?}"); } }); let lifecycle_listener = LifecycleListener { - tags_provider: Arc::clone(&tags_provider), + invocation_processor: Arc::clone(&invocation_processor), }; // TODO(astuyve): deprioritize this task after the first request tokio::spawn(async move { @@ -332,7 +339,6 @@ async fn extension_loop_active( setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; let flush_control = FlushControl::new(config.serverless_flush_strategy); - let mut invocation_context_buffer = InvocationContextBuffer::default(); let mut shutdown = false; let mut flush_interval = flush_control.get_flush_interval(); @@ -375,92 +381,97 @@ async fn extension_loop_active( Event::Metric(event) => { debug!("Metric event: {:?}", event); } - Event::Telemetry(event) => match event.record { - TelemetryRecord::PlatformStart { request_id, .. } => { - invocation_context_buffer.insert(InvocationContext { - request_id, - runtime_duration_ms: 0.0, - }); - } - TelemetryRecord::PlatformInitReport { - initialization_type, - phase, - metrics, - } => { - debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics); - lambda_enhanced_metrics - .set_init_duration_metric(metrics.duration_ms); - } - TelemetryRecord::PlatformRuntimeDone { - request_id, - status, - metrics, - .. - } => { - if let Some(metrics) = metrics { - invocation_context_buffer - .add_runtime_duration(&request_id, metrics.duration_ms); + Event::Telemetry(event) => { + let event_time = event.time; + match event.record { + TelemetryRecord::PlatformStart { request_id, .. } => { + let mut p = invocation_processor.lock().await; + p.on_platform_start(request_id, event_time); + drop(p); + } + TelemetryRecord::PlatformInitReport { + initialization_type, + phase, + metrics, + } => { + debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics); lambda_enhanced_metrics - .set_runtime_duration_metric(metrics.duration_ms); + .set_init_duration_metric(metrics.duration_ms); } + TelemetryRecord::PlatformRuntimeDone { + request_id, + status, + metrics, + .. + } => { + let mut p = invocation_processor.lock().await; + if let Some(metrics) = metrics { + p.on_platform_runtime_done( + &request_id, + metrics.duration_ms, + config.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_agent_tx.clone() + ).await; + lambda_enhanced_metrics + .set_runtime_duration_metric(metrics.duration_ms); + } + drop(p); - if status != Status::Success { - lambda_enhanced_metrics.increment_errors_metric(); - if status == Status::Timeout { - lambda_enhanced_metrics.increment_timeout_metric(); + if status != Status::Success { + lambda_enhanced_metrics.increment_errors_metric(); + if status == Status::Timeout { + lambda_enhanced_metrics.increment_timeout_metric(); + } } - } - debug!( - "Runtime done for request_id: {:?} with status: {:?}", - request_id, status - ); - // TODO(astuyve) it'll be easy to - // pass the invocation deadline to - // flush tasks here, so they can - // retry if we have more time - if flush_control.should_flush_end() { - tokio::join!( - logs_flusher.flush(), - metrics_flusher.flush(), - trace_flusher.manual_flush(), - stats_flusher.manual_flush() + debug!( + "Runtime done for request_id: {:?} with status: {:?}", + request_id, status ); + + // TODO(astuyve) it'll be easy to + // pass the invocation deadline to + // flush tasks here, so they can + // retry if we have more time + if flush_control.should_flush_end() { + tokio::join!( + logs_flusher.flush(), + metrics_flusher.flush(), + trace_flusher.manual_flush(), + stats_flusher.manual_flush() + ); + } + break; } - break; - } - TelemetryRecord::PlatformReport { - request_id, - status, - metrics, - .. - } => { - debug!( - "Platform report for request_id: {:?} with status: {:?}", - request_id, status - ); - lambda_enhanced_metrics.set_report_log_metrics(&metrics); - if let Some(invocation_context) = - invocation_context_buffer.remove(&request_id) - { - if invocation_context.runtime_duration_ms > 0.0 { - let post_runtime_duration_ms = metrics.duration_ms - - invocation_context.runtime_duration_ms; + TelemetryRecord::PlatformReport { + request_id, + status, + metrics, + .. + } => { + debug!( + "Platform report for request_id: {:?} with status: {:?}", + request_id, status + ); + lambda_enhanced_metrics.set_report_log_metrics(&metrics); + let mut p = invocation_processor.lock().await; + if let Some(post_runtime_duration_ms) = p.on_platform_report(&request_id, metrics.duration_ms) { lambda_enhanced_metrics.set_post_runtime_duration_metric( post_runtime_duration_ms, ); - } else { - debug!("Impossible to compute post runtime duration for request_id: {:?}", request_id); } - } + drop(p); - if shutdown { - break; + if shutdown { + break; + } + } + _ => { + debug!("Unforwarded Telemetry event: {:?}", event); } } - _ => { - debug!("Unforwarded Telemetry event: {:?}", event); - } - }, + } } } _ = flush_interval.tick() => { From 57a155cadf4d615fd4528883f56148cb32d34465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:28:00 -0400 Subject: [PATCH 14/24] move `MS_TO_NS` to `invocation::processor` --- bottlecap/src/lifecycle/invocation/processor.rs | 5 +++-- bottlecap/src/metrics/enhanced/constants.rs | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 5f0f3ef85..4bd747d3e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -11,10 +11,11 @@ use tokio::sync::mpsc::Sender; use tracing::debug; use crate::{ - config, lifecycle::invocation::context::ContextBuffer, metrics::enhanced::constants::MS_TO_NS, - tags::provider, traces::trace_processor, + config, lifecycle::invocation::context::ContextBuffer, tags::provider, traces::trace_processor, }; +pub const MS_TO_NS: f64 = 1_000_000.0; + pub struct Processor { pub context_buffer: ContextBuffer, pub current_request_id: String, diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 39ff08d10..3c2d34e0a 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -3,7 +3,6 @@ pub const BASE_LAMBDA_INVOCATION_PRICE: f64 = 0.000_000_2; pub const X86_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_016_666_7; pub const ARM_LAMBDA_PRICE_PER_GB_SECOND: f64 = 0.000_013_333_4; pub const MS_TO_SEC: f64 = 0.001; -pub const MS_TO_NS: f64 = 1_000_000.0; pub const MB_TO_GB: f64 = 1_024.0; // Enhanced metrics From 8ffd181ee2cb945d92cefb75bbe331e5c7f9dd87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:40:32 -0400 Subject: [PATCH 15/24] remove unnecessary constant --- bottlecap/src/bin/bottlecap/main.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 9121ae582..ab6382adc 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -381,12 +381,11 @@ async fn extension_loop_active( Event::Metric(event) => { debug!("Metric event: {:?}", event); } - Event::Telemetry(event) => { - let event_time = event.time; + Event::Telemetry(event) => match event.record { TelemetryRecord::PlatformStart { request_id, .. } => { let mut p = invocation_processor.lock().await; - p.on_platform_start(request_id, event_time); + p.on_platform_start(request_id, event.time); drop(p); } TelemetryRecord::PlatformInitReport { @@ -471,7 +470,6 @@ async fn extension_loop_active( debug!("Unforwarded Telemetry event: {:?}", event); } } - } } } _ = flush_interval.tick() => { From c7931485aa9fd9e16e670eac2d0902b3c59c4dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:45:15 -0400 Subject: [PATCH 16/24] add `Box::new` back to `trace_agent` --- bottlecap/src/bin/bottlecap/main.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index ab6382adc..6b3033f26 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -302,15 +302,17 @@ async fn extension_loop_active( let trace_flusher_clone = trace_flusher.clone(); let stats_flusher_clone = stats_flusher.clone(); - let trace_agent = trace_agent::TraceAgent::new( - Arc::clone(config), - trace_processor.clone(), - trace_flusher_clone, - stats_processor, - stats_flusher_clone, - Arc::clone(&tags_provider), - ) - .await; + let trace_agent = Box::new( + trace_agent::TraceAgent::new( + Arc::clone(config), + trace_processor.clone(), + trace_flusher_clone, + stats_processor, + stats_flusher_clone, + Arc::clone(&tags_provider), + ) + .await, + ); let trace_agent_tx = trace_agent.get_sender_copy(); tokio::spawn(async move { From 83762a0b76abbae01bba73f2a5d6bc7819cd5d90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 20 Sep 2024 13:45:02 -0400 Subject: [PATCH 17/24] add some comments --- bottlecap/src/lifecycle/invocation/processor.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 4bd747d3e..ac1234aa6 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -85,10 +85,23 @@ impl Processor { span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; span.meta .insert("request_id".to_string(), request_id.clone()); + + // todo(duncanista): add missing tags + // - cold start, proactive init + // - language + // - function.request - capture lambda payload + // - function.response + // - error.msg + // - error.type + // - error.stack + // - trigger tags (from inferred spans) + // - metrics tags (for asm) } if !self.lambda_library_detected { let span_size = std::mem::size_of_val(&self.span); + + // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", lang_version: "", From 4f23d5973546083e2a64fe78a8d0a41c3312f5e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:29:51 -0400 Subject: [PATCH 18/24] add unit tests for `context.rs` --- bottlecap/src/lifecycle/invocation/context.rs | 185 +++++++++++++++++- 1 file changed, 183 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index fbd638e95..a1c74bc57 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use tracing::debug; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Context { pub request_id: String, pub runtime_duration_ms: f64, @@ -33,6 +33,8 @@ pub struct ContextBuffer { } impl Default for ContextBuffer { + /// Creates a new `ContextBuffer` with a default capacity of 5. + /// fn default() -> Self { ContextBuffer { buffer: VecDeque::::with_capacity(5), @@ -41,8 +43,17 @@ impl Default for ContextBuffer { } impl ContextBuffer { + #[allow(dead_code)] + fn with_capacity(capacity: usize) -> Self { + ContextBuffer { + buffer: VecDeque::::with_capacity(capacity), + } + } + + /// Inserts a context into the buffer. If the buffer is full, the oldest `Context` is removed. + /// fn insert(&mut self, context: Context) { - if self.buffer.len() == self.buffer.capacity() { + if self.size() == self.buffer.capacity() { self.buffer.pop_front(); self.buffer.push_back(context); } else { @@ -54,6 +65,8 @@ impl ContextBuffer { } } + /// Removes a context from the buffer. Returns the removed `Context` if found. + /// pub fn remove(&mut self, request_id: &String) -> Option { if let Some(i) = self .buffer @@ -67,6 +80,8 @@ impl ContextBuffer { None } + /// Returns a reference to a `Context` from the buffer if found. + /// #[must_use] pub fn get(&self, request_id: &String) -> Option<&Context> { self.buffer @@ -74,6 +89,9 @@ impl ContextBuffer { .find(|context| context.request_id == *request_id) } + /// Adds the init duration to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// pub fn add_init_duration(&mut self, request_id: &String, init_duration_ms: f64) { if let Some(context) = self .buffer @@ -86,6 +104,9 @@ impl ContextBuffer { } } + /// Adds the start time to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// pub fn add_start_time(&mut self, request_id: &String, start_time: i64) { if let Some(context) = self .buffer @@ -98,6 +119,9 @@ impl ContextBuffer { } } + /// Adds the runtime duration to a `Context` in the buffer. If the `Context` is not found, a new + /// `Context` is created and added to the buffer. + /// pub fn add_runtime_duration(&mut self, request_id: &String, runtime_duration_ms: f64) { if let Some(context) = self .buffer @@ -114,4 +138,161 @@ impl ContextBuffer { )); } } + + /// Returns the size of the buffer. + /// + #[must_use] + pub fn size(&self) -> usize { + self.buffer.len() + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn test_insert() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // This should replace the first context + let request_id_3 = String::from("3"); + let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_3).unwrap(), &context); + + // First context should be None + assert!(buffer.get(&request_id).is_none()); + } + + #[test] + fn test_remove() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // Remove the first context + assert_eq!(buffer.remove(&request_id).unwrap().request_id, request_id); + // Size is reduced by 1 + assert_eq!(buffer.size(), 1); + assert!(buffer.get(&request_id).is_none()); + + // Remove a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + assert!(buffer.remove(&unexistent_request_id).is_none()); + } + + #[test] + fn test_get() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + let request_id_2 = String::from("2"); + let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&request_id_2).unwrap(), &context); + + // Get a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + assert!(buffer.get(&unexistent_request_id).is_none()); + } + + #[test] + fn test_add_init_duration() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_init_duration(&request_id, 100.0); + assert_eq!(buffer.get(&request_id).unwrap().init_duration_ms, 100.0); + + // Add init duration to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_init_duration(&unexistent_request_id, 200.0); + assert_eq!(buffer.size(), 2); + assert_eq!( + buffer.get(&unexistent_request_id).unwrap().init_duration_ms, + 200.0 + ); + } + + #[test] + fn test_add_start_time() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_start_time(&request_id, 100); + assert_eq!(buffer.get(&request_id).unwrap().start_time, 100); + + // Add start time to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_start_time(&unexistent_request_id, 200); + assert_eq!(buffer.size(), 2); + assert_eq!(buffer.get(&unexistent_request_id).unwrap().start_time, 200); + } + + #[test] + fn test_add_runtime_duration() { + let mut buffer = ContextBuffer::with_capacity(2); + + let request_id = String::from("1"); + let context = Context::new(request_id.clone(), 0.0, 0.0, 0); + buffer.insert(context.clone()); + assert_eq!(buffer.size(), 1); + assert_eq!(buffer.get(&request_id).unwrap(), &context); + + buffer.add_runtime_duration(&request_id, 100.0); + assert_eq!(buffer.get(&request_id).unwrap().runtime_duration_ms, 100.0); + + // Add runtime duration to a context that doesn't exist + let unexistent_request_id = String::from("unexistent"); + buffer.add_runtime_duration(&unexistent_request_id, 200.0); + assert_eq!(buffer.size(), 2); + assert_eq!( + buffer + .get(&unexistent_request_id) + .unwrap() + .runtime_duration_ms, + 200.0 + ); + } } From 7918a325daa016075c6ce07efb0c78a89bc2e1da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:39:36 -0400 Subject: [PATCH 19/24] use `on_invocation_start` --- bottlecap/src/lifecycle/listener.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index e57455dd8..ed307908d 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -54,7 +54,9 @@ impl Listener { invocation_processor: Arc>, ) -> http::Result> { match (req.method(), req.uri().path()) { - (&Method::POST, START_INVOCATION_PATH) => Self::start_invocation_handler(req), + (&Method::POST, START_INVOCATION_PATH) => { + Self::start_invocation_handler(req, invocation_processor).await + } (&Method::POST, END_INVOCATION_PATH) => { match Self::end_invocation_handler(req, invocation_processor).await { Ok(response) => Ok(response), @@ -76,7 +78,14 @@ impl Listener { } } - fn start_invocation_handler(_: Request) -> http::Result> { + async fn start_invocation_handler( + _: Request, + invocation_processor: Arc>, + ) -> http::Result> { + let mut processor = invocation_processor.lock().await; + processor.on_invocation_start(); + drop(processor); + Response::builder() .status(200) .body(Body::from(json!({}).to_string())) From fca14ec58f91c8f181c9119d5458fad3981c6800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:42:43 -0400 Subject: [PATCH 20/24] rename `lambda_library_detected` to `tracer_detected` --- bottlecap/src/lifecycle/invocation/processor.rs | 11 ++++------- bottlecap/src/lifecycle/listener.rs | 4 +++- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ac1234aa6..16ee3fc16 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -20,7 +20,7 @@ pub struct Processor { pub context_buffer: ContextBuffer, pub current_request_id: String, pub span: Span, - lambda_library_detected: bool, + tracer_detected: bool, } impl Processor { @@ -50,7 +50,7 @@ impl Processor { meta_struct: HashMap::new(), span_links: Vec::new(), }, - lambda_library_detected: true, + tracer_detected: false, } } @@ -98,7 +98,7 @@ impl Processor { // - metrics tags (for asm) } - if !self.lambda_library_detected { + if self.tracer_detected { let span_size = std::mem::size_of_val(&self.span); // todo: figure out what to do here @@ -142,13 +142,10 @@ impl Processor { } pub fn on_invocation_start(&mut self) { - debug!("[usm] on_invocation_start"); - self.lambda_library_detected = false; + self.tracer_detected = true; } pub fn on_invocation_end(&mut self, trace_id: u64, span_id: u64, parent_id: u64) { - debug!("[usm] on_invocation_end"); - let span = &mut self.span; span.trace_id = trace_id; span.span_id = span_id; diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index ed307908d..90e9c9178 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -9,7 +9,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{http, Body, Method, Request, Response, StatusCode}; use serde_json::json; use tokio::sync::Mutex; -use tracing::{error, warn}; +use tracing::{error, warn, debug}; use crate::lifecycle::invocation::processor::Processor as InvocationProcessor; @@ -82,6 +82,7 @@ impl Listener { _: Request, invocation_processor: Arc>, ) -> http::Result> { + debug!("Received start invocation request"); let mut processor = invocation_processor.lock().await; processor.on_invocation_start(); drop(processor); @@ -95,6 +96,7 @@ impl Listener { req: Request, invocation_processor: Arc>, ) -> http::Result> { + debug!("Received end invocation request"); let (parts, _) = req.into_parts(); let headers = parts.headers; From 9413e1c385eab801dcd3dae721eb931d5c1dd8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:30:02 -0400 Subject: [PATCH 21/24] fmt --- bottlecap/src/lifecycle/listener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/listener.rs b/bottlecap/src/lifecycle/listener.rs index 90e9c9178..ebb3dd833 100644 --- a/bottlecap/src/lifecycle/listener.rs +++ b/bottlecap/src/lifecycle/listener.rs @@ -9,7 +9,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{http, Body, Method, Request, Response, StatusCode}; use serde_json::json; use tokio::sync::Mutex; -use tracing::{error, warn, debug}; +use tracing::{debug, error, warn}; use crate::lifecycle::invocation::processor::Processor as InvocationProcessor; From f03ddd0455d70afd6d90db5849ec3a08f2c3f5bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:58:34 -0400 Subject: [PATCH 22/24] remove `current_request_id` I think we dont need it --- .../src/lifecycle/invocation/processor.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 16ee3fc16..21ca1d788 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -18,7 +18,6 @@ pub const MS_TO_NS: f64 = 1_000_000.0; pub struct Processor { pub context_buffer: ContextBuffer, - pub current_request_id: String, pub span: Span, tracer_detected: bool, } @@ -33,7 +32,6 @@ impl Processor { Processor { context_buffer: ContextBuffer::default(), - current_request_id: String::new(), span: Span { service, name: "aws.lambda".to_string(), @@ -55,7 +53,6 @@ impl Processor { } pub fn on_platform_start(&mut self, request_id: String, time: DateTime) { - self.current_request_id.clone_from(&request_id); let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) .expect("time went backwards") @@ -79,7 +76,7 @@ impl Processor { self.context_buffer .add_runtime_duration(request_id, duration_ms); - if let Some(context) = self.context_buffer.get(&self.current_request_id) { + if let Some(context) = self.context_buffer.get(request_id) { let span = &mut self.span; // `round` is intentionally meant to be a whole integer span.duration = (context.runtime_duration_ms * MS_TO_NS).round() as i64; @@ -121,15 +118,19 @@ impl Processor { span_size, ); - match trace_agent_tx.send(send_data).await { - Ok(()) => println!("[usm] Sent span to agent"), - Err(e) => println!("[usm] Failed to send span to agent: {e}"), + if let Err(e) = trace_agent_tx.send(send_data).await { + debug!("Failed to send invocation span to agent: {e}"); } } } + /// Given a `request_id` and the duration in milliseconds of the platform report, + /// calculate the duration of the runtime if the `request_id` is found in the context buffer. + /// + /// If the `request_id` is not found in the context buffer, return `None`. + /// If the `runtime_duration_ms` hasn't been seen, return `None`. + /// pub fn on_platform_report(&mut self, request_id: &String, duration_ms: f64) -> Option { - debug!("[usm] on_platform_report"); if let Some(context) = self.context_buffer.remove(request_id) { if context.runtime_duration_ms == 0.0 { return None; @@ -141,10 +142,15 @@ impl Processor { None } + /// If this method is called, it means that we are operating in a Universally Instrumented + /// runtime. Therefore, we need to set the `tracer_detected` flag to `true`. + /// pub fn on_invocation_start(&mut self) { self.tracer_detected = true; } + /// Given trace context information, set it to the current span. + /// pub fn on_invocation_end(&mut self, trace_id: u64, span_id: u64, parent_id: u64) { let span = &mut self.span; span.trace_id = trace_id; From cfbbbde993599cf76a0d91cabcb3e28d66de3a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:56:30 -0400 Subject: [PATCH 23/24] add comment --- bottlecap/src/lifecycle/invocation/processor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 21ca1d788..bccb6785e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -52,6 +52,10 @@ impl Processor { } } + /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. + /// + /// Also, set the start time of the current span. + /// pub fn on_platform_start(&mut self, request_id: String, time: DateTime) { let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) From 6a8effac84e055a9428e336526b3a84eb95d8ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:59:53 -0400 Subject: [PATCH 24/24] fmt --- bottlecap/src/lifecycle/invocation/processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index bccb6785e..e553150d1 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -53,7 +53,7 @@ impl Processor { } /// Given a `request_id` and the time of the platform start, add the start time to the context buffer. - /// + /// /// Also, set the start time of the current span. /// pub fn on_platform_start(&mut self, request_id: String, time: DateTime) {