From 2aa9540da4e13fda751d5ca459f261bc70482d81 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:12:37 -0500 Subject: [PATCH 1/6] refactor types and methods we were creating a client every time when flushing traces, now we just use one, also removes unnecessary traits as we are not creating more tracing agents for other use cases --- bottlecap/src/bin/bottlecap/main.rs | 12 +- bottlecap/src/flushing/service.rs | 31 +--- bottlecap/src/traces/hyper_client.rs | 113 ++++++++++++++ bottlecap/src/traces/mod.rs | 1 + bottlecap/src/traces/stats_flusher.rs | 70 +++++---- bottlecap/src/traces/trace_flusher.rs | 204 +++++++------------------- 6 files changed, 222 insertions(+), 209 deletions(-) create mode 100644 bottlecap/src/traces/hyper_client.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 12ae368f7..e93b95d69 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,14 +66,14 @@ use bottlecap::{ span_dedup_service, stats_aggregator::StatsAggregator, stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, - stats_flusher::{self, StatsFlusher}, + stats_flusher, stats_generator::StatsGenerator, stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, trace_aggregator_service::{ AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, }, - trace_flusher::{self, TraceFlusher}, + trace_flusher, trace_processor::{self, SendingTraceProcessor}, }, }; @@ -1081,9 +1081,9 @@ fn start_trace_agent( appsec_processor: Option>>, ) -> ( Sender, - Arc, + Arc, Arc, - Arc, + Arc, Arc, tokio_util::sync::CancellationToken, StatsConcentratorHandle, @@ -1096,7 +1096,7 @@ fn start_trace_agent( let stats_aggregator: Arc> = Arc::new(TokioMutex::new( StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), )); - let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( + let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), Arc::clone(config), @@ -1108,7 +1108,7 @@ fn start_trace_agent( let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default(); tokio::spawn(trace_aggregator_service.run()); - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new( trace_aggregator_handle.clone(), config.clone(), api_key_factory.clone(), diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 152a13140..c632d6e36 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -23,20 +23,11 @@ use crate::traces::{ /// - Spawning non-blocking flush tasks /// - Awaiting pending flush handles with retry logic /// - Performing blocking flushes (spawn + await) -/// -/// # Type Parameters -/// -/// * `TF` - Trace flusher type implementing `TraceFlusher` -/// * `SF` - Stats flusher type implementing `StatsFlusher` -pub struct FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +pub struct FlushingService { // Flushers logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, @@ -47,17 +38,13 @@ where handles: FlushHandles, } -impl FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl FlushingService { /// Creates a new `FlushingService` with the given flushers. #[must_use] pub fn new( logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, metrics_aggr_handle: MetricsAggregatorHandle, @@ -340,11 +327,7 @@ where } } -impl std::fmt::Debug for FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl std::fmt::Debug for FlushingService { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlushingService") .field("handles", &self.handles) diff --git a/bottlecap/src/traces/hyper_client.rs b/bottlecap/src/traces/hyper_client.rs new file mode 100644 index 000000000..99cd60fe5 --- /dev/null +++ b/bottlecap/src/traces/hyper_client.rs @@ -0,0 +1,113 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Hyper-based HTTP client for trace and stats flushers. +//! +//! This module provides the HTTP client type required by `libdd_trace_utils` +//! for sending traces and stats to Datadog intake endpoints. + +use hyper_http_proxy; +use hyper_rustls::HttpsConnectorBuilder; +use libdd_common::{GenericHttpClient, hyper_migration}; +use rustls::RootCertStore; +use rustls_pki_types::CertificateDer; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::sync::LazyLock; +use tracing::debug; + +/// Type alias for the HTTP client used by trace and stats flushers. +/// +/// This is the client type expected by `libdd_trace_utils::SendData::send()`. +pub type HyperClient = + GenericHttpClient>; + +/// Initialize the crypto provider needed for setting custom root certificates. +fn ensure_crypto_provider_initialized() { + static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { + #[cfg(unix)] + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("Failed to install default CryptoProvider"); + }); + + let () = &*INIT_CRYPTO_PROVIDER; +} + +/// Creates a new hyper-based HTTP client with the given configuration. +/// +/// This client is compatible with `libdd_trace_utils` and supports: +/// - HTTPS proxy configuration +/// - Custom TLS root certificates +/// +/// # Arguments +/// +/// * `proxy_https` - Optional HTTPS proxy URL +/// * `tls_cert_file` - Optional path to a PEM file containing root certificates +/// +/// # Errors +/// +/// Returns an error if: +/// - The proxy URL cannot be parsed +/// - The TLS certificate file cannot be read or parsed +pub fn create_client( + proxy_https: Option<&String>, + tls_cert_file: Option<&String>, +) -> Result> { + // Create the base connector with optional custom TLS config + let connector = if let Some(ca_cert_path) = tls_cert_file { + // Ensure crypto provider is initialized before creating TLS config + ensure_crypto_provider_initialized(); + + // Load the custom certificate + let cert_file = File::open(ca_cert_path)?; + let mut reader = BufReader::new(cert_file); + let certs: Vec = + rustls_pemfile::certs(&mut reader).collect::, _>>()?; + + // Create a root certificate store and add custom certs + let mut root_store = RootCertStore::empty(); + for cert in certs { + root_store.add(cert)?; + } + + // Build the TLS config with custom root certificates + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + // Build the HTTPS connector with custom config + let https_connector = HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_or_http() + .enable_http1() + .build(); + + debug!( + "HYPER_CLIENT | Added root certificate from {}", + ca_cert_path + ); + + // Construct the Connector::Https variant directly + libdd_common::connector::Connector::Https(https_connector) + } else { + // Use default connector + libdd_common::connector::Connector::default() + }; + + if let Some(proxy) = proxy_https { + let proxy = + hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); + let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; + let client = hyper_migration::client_builder().build(proxy_connector); + debug!( + "HYPER_CLIENT | Proxy connector created with proxy: {:?}", + proxy_https + ); + Ok(client) + } else { + let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; + Ok(hyper_migration::client_builder().build(proxy_connector)) + } +} diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 5a2a515dc..7d1581c2e 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod context; +pub mod hyper_client; pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 7f2a0d998..d74f8c562 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -1,7 +1,6 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; @@ -9,55 +8,41 @@ use tokio::sync::OnceCell; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::stats_aggregator::StatsAggregator; -use crate::traces::trace_flusher::ServerlessTraceFlusher; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; use libdd_trace_protobuf::pb; use libdd_trace_utils::{config_utils::trace_stats_url, stats_utils}; use tracing::{debug, error}; -#[async_trait] -pub trait StatsFlusher { - fn new( - api_key_factory: Arc, - aggregator: Arc>, - config: Arc, - ) -> Self - where - Self: Sized; - /// Flushes stats to the Datadog trace stats intake. - async fn send(&self, traces: Vec); - - async fn flush(&self, force_flush: bool); -} - -#[allow(clippy::module_name_repetitions)] -#[derive(Clone)] -pub struct ServerlessStatsFlusher { - // pub buffer: Arc>>, +pub struct StatsFlusher { aggregator: Arc>, config: Arc, api_key_factory: Arc, endpoint: OnceCell, + /// Cached HTTP client, lazily initialized on first use. + http_client: OnceCell, } -#[async_trait] -impl StatsFlusher for ServerlessStatsFlusher { - fn new( +impl StatsFlusher { + #[must_use] + pub fn new( api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self { - ServerlessStatsFlusher { + StatsFlusher { aggregator, config, api_key_factory, endpoint: OnceCell::new(), + http_client: OnceCell::new(), } } - async fn send(&self, stats: Vec) { + /// Flushes stats to the Datadog trace stats intake. + pub async fn send(&self, stats: Vec) { if stats.is_empty() { return; } @@ -102,10 +87,9 @@ impl StatsFlusher for ServerlessStatsFlusher { let start = std::time::Instant::now(); - let Ok(http_client) = ServerlessTraceFlusher::get_http_client( - self.config.proxy_https.as_ref(), - self.config.tls_cert_file.as_ref(), - ) else { + // Get or create the cached HTTP client + let http_client = self.get_or_init_http_client().await; + let Some(http_client) = http_client else { error!("STATS_FLUSHER | Failed to create HTTP client"); return; }; @@ -114,7 +98,7 @@ impl StatsFlusher for ServerlessStatsFlusher { serialized_stats_payload, endpoint, api_key.as_str(), - Some(&http_client), + Some(http_client), ) .await; let elapsed = start.elapsed(); @@ -131,7 +115,7 @@ impl StatsFlusher for ServerlessStatsFlusher { }; } - async fn flush(&self, force_flush: bool) { + pub async fn flush(&self, force_flush: bool) { let mut guard = self.aggregator.lock().await; let mut stats = guard.get_batch(force_flush).await; @@ -141,4 +125,26 @@ impl StatsFlusher for ServerlessStatsFlusher { stats = guard.get_batch(force_flush).await; } } + /// Returns a reference to the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + async fn get_or_init_http_client(&self) -> Option<&HyperClient> { + let client = self + .http_client + .get_or_init(|| async { + match hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) { + Ok(client) => client, + Err(e) => { + error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); + panic!("STATS_FLUSHER | Cannot proceed without HTTP client"); + } + } + }) + .await; + Some(client) + } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 6d88a12d2..3f3af30f8 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -1,67 +1,36 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use dogstatsd::api_key::ApiKeyFactory; -use hyper_http_proxy; -use hyper_rustls::HttpsConnectorBuilder; -use libdd_common::{Endpoint, GenericHttpClient, hyper_migration}; +use libdd_common::Endpoint; use libdd_trace_utils::{ config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; -use rustls::RootCertStore; -use rustls_pki_types::CertificateDer; -use std::error::Error; -use std::fs::File; -use std::io::BufReader; use std::str::FromStr; use std::sync::Arc; -use std::sync::LazyLock; +use tokio::sync::OnceCell; use tokio::task::JoinSet; use tracing::{debug, error}; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::trace_aggregator_service::AggregatorHandle; -#[async_trait] -pub trait TraceFlusher { - fn new( - aggregator_handle: AggregatorHandle, - config: Arc, - api_key_factory: Arc, - ) -> Self - where - Self: Sized; - /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. - /// Returns the traces back if there was an error sending them. - async fn send( - traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, - ) -> Option>; - - /// Flushes traces by getting every available batch on the aggregator. - /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. - /// Returns any traces that failed to send and should be retried. - async fn flush(&self, failed_traces: Option>) -> Option>; -} - -#[derive(Clone)] -#[allow(clippy::module_name_repetitions)] -pub struct ServerlessTraceFlusher { +pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, pub additional_endpoints: Vec, + /// Cached HTTP client, lazily initialized on first use. + http_client: OnceCell, } -#[async_trait] -impl TraceFlusher for ServerlessTraceFlusher { - fn new( +impl TraceFlusher { + #[must_use] + pub fn new( aggregator_handle: AggregatorHandle, config: Arc, api_key_factory: Arc, @@ -83,15 +52,19 @@ impl TraceFlusher for ServerlessTraceFlusher { } } - ServerlessTraceFlusher { + TraceFlusher { aggregator_handle, config, api_key_factory, additional_endpoints, + http_client: OnceCell::new(), } } - async fn flush(&self, failed_traces: Option>) -> Option> { + /// Flushes traces by getting every available batch on the aggregator. + /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. + /// Returns any traces that failed to send and should be retried. + pub async fn flush(&self, failed_traces: Option>) -> Option> { let Some(api_key) = self.api_key_factory.get_api_key().await else { error!( "TRACES | Failed to resolve API key, dropping aggregated data and skipping flushing." @@ -102,6 +75,12 @@ impl TraceFlusher for ServerlessTraceFlusher { return None; }; + // Get or create the cached HTTP client + let Some(http_client) = self.get_or_init_http_client().await else { + error!("TRACES | Failed to create HTTP client, skipping flush"); + return None; + }; + let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { @@ -111,13 +90,7 @@ impl TraceFlusher for ServerlessTraceFlusher { "TRACES | Retrying to send {} previously failed batches", traces.len() ); - let retry_result = Self::send( - traces, - None, - &self.config.proxy_https, - &self.config.tls_cert_file, - ) - .await; + let retry_result = Self::send_traces(traces, None, http_client.clone()).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -143,18 +116,15 @@ impl TraceFlusher for ServerlessTraceFlusher { .collect(); let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); - batch_tasks.spawn(async move { - Self::send(traces_clone, None, &proxy_https, &tls_cert_file).await - }); + let client_clone = http_client.clone(); + batch_tasks + .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); for endpoint in self.additional_endpoints.clone() { let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); + let client_clone = http_client.clone(); batch_tasks.spawn(async move { - Self::send(traces_clone, Some(&endpoint), &proxy_https, &tls_cert_file).await + Self::send_traces(traces_clone, Some(endpoint), client_clone).await }); } } @@ -171,11 +141,36 @@ impl TraceFlusher for ServerlessTraceFlusher { None } - async fn send( + /// Returns a clone of the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + async fn get_or_init_http_client(&self) -> Option { + let client = self + .http_client + .get_or_init(|| async { + match hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) { + Ok(client) => client, + Err(e) => { + error!("TRACES | Failed to create HTTP client: {e}"); + panic!("TRACES | Cannot proceed without HTTP client"); + } + } + }) + .await; + Some(client.clone()) + } + + /// Sends traces to the Datadog intake endpoint using the provided HTTP client. + /// + /// Returns the traces back if there was an error sending them. + async fn send_traces( traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, + endpoint: Option, + http_client: HyperClient, ) -> Option> { if traces.is_empty() { return None; @@ -185,15 +180,8 @@ impl TraceFlusher for ServerlessTraceFlusher { tokio::task::yield_now().await; debug!("TRACES | Flushing {} traces", coalesced_traces.len()); - let Ok(http_client) = - ServerlessTraceFlusher::get_http_client(proxy_https.as_ref(), tls_cert_file.as_ref()) - else { - error!("TRACES | Failed to create HTTP client"); - return None; - }; - for trace in &coalesced_traces { - let trace_with_endpoint = match endpoint { + let trace_with_endpoint = match &endpoint { Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), None => trace.clone(), }; @@ -211,81 +199,3 @@ impl TraceFlusher for ServerlessTraceFlusher { None } } - -// Initialize the crypto provider needed for setting custom root certificates -fn ensure_crypto_provider_initialized() { - static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { - #[cfg(unix)] - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .expect("Failed to install default CryptoProvider"); - }); - - let () = &*INIT_CRYPTO_PROVIDER; -} - -impl ServerlessTraceFlusher { - pub fn get_http_client( - proxy_https: Option<&String>, - tls_cert_file: Option<&String>, - ) -> Result< - GenericHttpClient>, - Box, - > { - // Create the base connector with optional custom TLS config - let connector = if let Some(ca_cert_path) = tls_cert_file { - // Ensure crypto provider is initialized before creating TLS config - ensure_crypto_provider_initialized(); - - // Load the custom certificate - let cert_file = File::open(ca_cert_path)?; - let mut reader = BufReader::new(cert_file); - let certs: Vec = - rustls_pemfile::certs(&mut reader).collect::, _>>()?; - - // Create a root certificate store and add custom certs - let mut root_store = RootCertStore::empty(); - for cert in certs { - root_store.add(cert)?; - } - - // Build the TLS config with custom root certificates - let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - - // Build the HTTPS connector with custom config - let https_connector = HttpsConnectorBuilder::new() - .with_tls_config(tls_config) - .https_or_http() - .enable_http1() - .build(); - - debug!( - "TRACES | GET_HTTP_CLIENT | Added root certificate from {}", - ca_cert_path - ); - - // Construct the Connector::Https variant directly - libdd_common::connector::Connector::Https(https_connector) - } else { - // Use default connector - libdd_common::connector::Connector::default() - }; - - if let Some(proxy) = proxy_https { - let proxy = - hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); - let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; - let client = hyper_migration::client_builder().build(proxy_connector); - debug!( - "TRACES | GET_HTTP_CLIENT | Proxy connector created with proxy: {:?}", - proxy_https - ); - Ok(client) - } else { - let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; - Ok(hyper_migration::client_builder().build(proxy_connector)) - } - } -} From cd090c7ec428c2a280d43a39a26a39aa61ff022a Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:29:57 -0500 Subject: [PATCH 2/6] never panic --- bottlecap/src/traces/stats_flusher.rs | 27 +++++++++------- bottlecap/src/traces/trace_flusher.rs | 46 +++++++++++++++++---------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index d74f8c562..b8061613a 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -129,22 +129,25 @@ impl StatsFlusher { /// /// The client is created once and reused for all subsequent flushes, /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. async fn get_or_init_http_client(&self) -> Option<&HyperClient> { - let client = self + match self .http_client - .get_or_init(|| async { - match hyper_client::create_client( + .get_or_try_init(|| async { + hyper_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), - ) { - Ok(client) => client, - Err(e) => { - error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); - panic!("STATS_FLUSHER | Cannot proceed without HTTP client"); - } - } + ) }) - .await; - Some(client) + .await + { + Ok(client) => Some(client), + Err(e) => { + error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); + None + } + } } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 3f3af30f8..5f30ea691 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -145,31 +145,43 @@ impl TraceFlusher { /// /// The client is created once and reused for all subsequent flushes, /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. async fn get_or_init_http_client(&self) -> Option { - let client = self + match self .http_client - .get_or_init(|| async { - match hyper_client::create_client( + .get_or_try_init(|| async { + hyper_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), - ) { - Ok(client) => client, - Err(e) => { - error!("TRACES | Failed to create HTTP client: {e}"); - panic!("TRACES | Cannot proceed without HTTP client"); - } - } + ) }) - .await; - Some(client.clone()) + .await + { + Ok(client) => Some(client.clone()), + Err(e) => { + error!("TRACES | Failed to create HTTP client: {e}"); + None + } + } } /// Sends traces to the Datadog intake endpoint using the provided HTTP client. /// - /// Returns the traces back if there was an error sending them. + /// # Arguments + /// + /// * `traces` - The traces to send + /// * `override_endpoint` - If `Some`, sends to this endpoint instead of the trace's + /// configured endpoint. Used for sending to additional endpoints. + /// * `http_client` - The HTTP client to use for sending + /// + /// # Returns + /// + /// Returns the traces back if there was an error sending them (for retry). async fn send_traces( traces: Vec, - endpoint: Option, + override_endpoint: Option, http_client: HyperClient, ) -> Option> { if traces.is_empty() { @@ -181,12 +193,12 @@ impl TraceFlusher { debug!("TRACES | Flushing {} traces", coalesced_traces.len()); for trace in &coalesced_traces { - let trace_with_endpoint = match &endpoint { - Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), + let trace_to_send = match &override_endpoint { + Some(endpoint) => trace.with_endpoint(endpoint.clone()), None => trace.clone(), }; - let send_result = trace_with_endpoint.send(&http_client).await.last_result; + let send_result = trace_to_send.send(&http_client).await.last_result; if let Err(e) = send_result { error!("TRACES | Request failed: {e:?}"); From 068c83550e8e600d401b5545c5a60bf7b48caa57 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:35:58 -0500 Subject: [PATCH 3/6] add comments on additional endpoints to fix later --- bottlecap/src/traces/trace_flusher.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 5f30ea691..5fdad478c 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -23,6 +23,9 @@ pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, + /// Additional endpoints for dual-shipping traces to multiple Datadog sites. + /// Configured via `DD_APM_ADDITIONAL_ENDPOINTS` (e.g., sending to both US and EU). + /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. http_client: OnceCell, @@ -35,8 +38,10 @@ impl TraceFlusher { config: Arc, api_key_factory: Arc, ) -> Self { + // Parse additional endpoints for dual-shipping from config. + // Format: { "https://trace.agent.datadoghq.eu": ["api-key-1", "api-key-2"], ... } + // Each URL + API key combination becomes a separate endpoint. let mut additional_endpoints: Vec = Vec::new(); - for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); @@ -47,7 +52,6 @@ impl TraceFlusher { timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, }; - additional_endpoints.push(endpoint); } } @@ -84,7 +88,12 @@ impl TraceFlusher { let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { - // If we have traces from a previous failed attempt, try to send those first + // If we have traces from a previous failed attempt, try to send those first. + // TODO: Currently retries always go to the primary endpoint (None), even if the + // original failure was for an additional endpoint. This means traces that failed + // to send to additional endpoints will be retried to the primary endpoint instead. + // To fix this, we need to track which endpoint each failed trace was destined for, + // possibly by storing (Vec, Option) pairs in failed_batch. if !traces.is_empty() { debug!( "TRACES | Retrying to send {} previously failed batches", @@ -115,11 +124,15 @@ impl TraceFlusher { .map(SendDataBuilder::build) .collect(); + // Send to PRIMARY endpoint (the default endpoint configured in the trace). + // Passing None means "use the endpoint already configured in the SendData". let traces_clone = traces.clone(); let client_clone = http_client.clone(); batch_tasks .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); + // Send to ADDITIONAL endpoints for dual-shipping. + // Each additional endpoint gets the same traces, enabling multi-region delivery. for endpoint in self.additional_endpoints.clone() { let traces_clone = traces.clone(); let client_clone = http_client.clone(); @@ -128,6 +141,9 @@ impl TraceFlusher { }); } } + // Collect failed traces from all endpoints (primary + additional). + // Note: We lose track of which endpoint each failure came from here. + // All failures are mixed together and will be retried to the primary endpoint only. while let Some(result) = batch_tasks.join_next().await { if let Ok(Some(mut failed)) = result { failed_batch.append(&mut failed); From 12bcb5f8c86df8d67f0a63b5ded49753163cb9eb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:49:20 -0500 Subject: [PATCH 4/6] add todo comment --- bottlecap/src/traces/stats_flusher.rs | 3 +++ bottlecap/src/traces/trace_flusher.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index b8061613a..1ab96328a 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -22,6 +22,9 @@ pub struct StatsFlusher { api_key_factory: Arc, endpoint: OnceCell, /// Cached HTTP client, lazily initialized on first use. + /// TODO: StatsFlusher and TraceFlusher both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. Consider using a + /// SharedHyperClient wrapper passed to both flushers from main.rs. http_client: OnceCell, } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 5fdad478c..f08b24751 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -28,6 +28,9 @@ pub struct TraceFlusher { /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. + /// TODO: TraceFlusher and StatsFlusher both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. Consider using a + /// SharedHyperClient wrapper passed to both flushers from main.rs. http_client: OnceCell, } From b8b40d2226bb84844935e1b6d4b9f356a7d985bb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:31:02 -0500 Subject: [PATCH 5/6] add stats retry --- bottlecap/src/flushing/handles.rs | 5 +- bottlecap/src/flushing/service.rs | 30 +++++++--- bottlecap/src/traces/stats_flusher.rs | 85 ++++++++++++++++++++------- 3 files changed, 89 insertions(+), 31 deletions(-) diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs index 851d8e4a9..61376ac4b 100644 --- a/bottlecap/src/flushing/handles.rs +++ b/bottlecap/src/flushing/handles.rs @@ -2,6 +2,7 @@ use datadog_protos::metrics::SketchPayload; use dogstatsd::datadog::Series; +use libdd_trace_protobuf::pb; use libdd_trace_utils::send_data::SendData; use tokio::task::JoinHandle; @@ -32,8 +33,8 @@ pub struct FlushHandles { pub metric_flush_handles: Vec>, /// Handles for proxy flush operations. Returns failed request builders for retry. pub proxy_flush_handles: Vec>>, - /// Handles for stats flush operations. Stats don't support retry. - pub stats_flush_handles: Vec>, + /// Handles for stats flush operations. Returns failed stats payloads for retry. + pub stats_flush_handles: Vec>>, } impl FlushHandles { diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index c632d6e36..046f65649 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -122,11 +122,13 @@ impl FlushingService { self.handles.metric_flush_handles.push(handle); } - // Spawn stats flush (fire-and-forget, no retry) + // Spawn stats flush let sf = Arc::clone(&self.stats_flusher); self.handles .stats_flush_handles - .push(tokio::spawn(async move { sf.flush(false).await })); + .push(tokio::spawn(async move { + sf.flush(false, None).await.unwrap_or_default() + })); // Spawn proxy flush let pf = self.proxy_flusher.clone(); @@ -153,11 +155,25 @@ impl FlushingService { let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; - // Await stats handles (no retry) + // Await stats handles with retry for handle in self.handles.stats_flush_handles.drain(..) { - if let Err(e) = handle.await { - error!("FLUSHING_SERVICE | stats flush error {e:?}"); - flush_error = true; + match handle.await { + Ok(retry) => { + let sf = self.stats_flusher.clone(); + if !retry.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} stats payloads", + retry.len() + ); + joinset.spawn(async move { + sf.flush(false, Some(retry)).await; + }); + } + } + Err(e) => { + error!("FLUSHING_SERVICE | stats flush error {e:?}"); + flush_error = true; + } } } @@ -312,7 +328,7 @@ impl FlushingService { self.logs_flusher.flush(None), futures::future::join_all(metrics_futures), self.trace_flusher.flush(None), - self.stats_flusher.flush(force_stats), + self.stats_flusher.flush(force_stats, None), self.proxy_flusher.flush(None), ); } diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 1ab96328a..222dddbd7 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -22,9 +22,8 @@ pub struct StatsFlusher { api_key_factory: Arc, endpoint: OnceCell, /// Cached HTTP client, lazily initialized on first use. - /// TODO: StatsFlusher and TraceFlusher both hit trace.agent.datadoghq.{site} and could - /// share a single HTTP client for better connection pooling. Consider using a - /// SharedHyperClient wrapper passed to both flushers from main.rs. + /// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. http_client: OnceCell, } @@ -45,14 +44,20 @@ impl StatsFlusher { } /// Flushes stats to the Datadog trace stats intake. - pub async fn send(&self, stats: Vec) { + /// + /// Returns `None` on success, or `Some(failed_stats)` if the flush failed and should be retried. + pub async fn send( + &self, + stats: Vec, + ) -> Option> { if stats.is_empty() { - return; + return None; } let Some(api_key) = self.api_key_factory.get_api_key().await else { - error!("Skipping flushing stats: Failed to resolve API key"); - return; + error!("STATS | Skipping flushing stats: Failed to resolve API key"); + // No API key means we can't send - don't retry as it won't help + return None; }; let api_key_clone = api_key.to_string(); @@ -72,17 +77,18 @@ impl StatsFlusher { }) .await; - debug!("Flushing {} stats", stats.len()); + debug!("STATS | Flushing {} stats", stats.len()); - let stats_payload = stats_utils::construct_stats_payload(stats); + let stats_payload = stats_utils::construct_stats_payload(stats.clone()); - debug!("Stats payload to be sent: {stats_payload:?}"); + debug!("STATS | Stats payload to be sent: {stats_payload:?}"); let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) { Ok(res) => res, Err(err) => { - error!("Failed to serialize stats payload, dropping stats: {err}"); - return; + // Serialization errors are permanent - data is malformed, don't retry + error!("STATS | Failed to serialize stats payload, dropping stats: {err}"); + return None; } }; @@ -93,8 +99,8 @@ impl StatsFlusher { // Get or create the cached HTTP client let http_client = self.get_or_init_http_client().await; let Some(http_client) = http_client else { - error!("STATS_FLUSHER | Failed to create HTTP client"); - return; + error!("STATS | Failed to create HTTP client, will retry"); + return Some(stats); }; let resp = stats_utils::send_stats_payload_with_client( @@ -106,27 +112,62 @@ impl StatsFlusher { .await; let elapsed = start.elapsed(); debug!( - "Stats request to {} took {} ms", + "STATS | Stats request to {} took {} ms", stats_url, elapsed.as_millis() ); match resp { - Ok(()) => debug!("Successfully flushed stats"), + Ok(()) => { + debug!("STATS | Successfully flushed stats"); + None + } Err(e) => { - error!("Error sending stats: {e:?}"); + // Network/server errors are temporary - return stats for retry + error!("STATS | Error sending stats: {e:?}"); + Some(stats) } - }; + } } - pub async fn flush(&self, force_flush: bool) { - let mut guard = self.aggregator.lock().await; + /// Flushes stats from the aggregator. + /// + /// Returns `None` on success, or `Some(failed_stats)` if any flush failed and should be retried. + /// If `failed_stats` is provided, it will attempt to send those first before fetching new stats. + pub async fn flush( + &self, + force_flush: bool, + failed_stats: Option>, + ) -> Option> { + let mut all_failed: Vec = Vec::new(); + + // First, retry any previously failed stats + if let Some(retry_stats) = failed_stats { + if !retry_stats.is_empty() { + debug!( + "STATS | Retrying {} previously failed stats", + retry_stats.len() + ); + if let Some(still_failed) = self.send(retry_stats).await { + all_failed.extend(still_failed); + } + } + } + // Then flush new stats from the aggregator + let mut guard = self.aggregator.lock().await; let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { - self.send(stats).await; - + if let Some(failed) = self.send(stats).await { + all_failed.extend(failed); + } stats = guard.get_batch(force_flush).await; } + + if all_failed.is_empty() { + None + } else { + Some(all_failed) + } } /// Returns a reference to the cached HTTP client, initializing it if necessary. /// From 7590bd259ba5ec1713879242f7b509d64097d6b2 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:31:17 -0500 Subject: [PATCH 6/6] update docs --- bottlecap/src/bin/bottlecap/main.rs | 1 + bottlecap/src/traces/trace_flusher.rs | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index e93b95d69..185001f20 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1394,6 +1394,7 @@ mod flush_handles_tests { let mut handles = FlushHandles::new(); let handle = tokio::spawn(async { sleep(Duration::from_millis(5)).await; + Vec::new() // Return empty Vec for stats retry }); handles.stats_flush_handles.push(handle); diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index f08b24751..811751d8b 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -28,9 +28,8 @@ pub struct TraceFlusher { /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. - /// TODO: TraceFlusher and StatsFlusher both hit trace.agent.datadoghq.{site} and could - /// share a single HTTP client for better connection pooling. Consider using a - /// SharedHyperClient wrapper passed to both flushers from main.rs. + /// TODO: `TraceFlusher` and `StatsFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. http_client: OnceCell, }