Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use bottlecap::{
proxy_aggregator,
proxy_flusher::Flusher as ProxyFlusher,
stats_aggregator::StatsAggregator,
stats_concentrator_service::StatsConcentratorService,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent,
trace_aggregator::{self, SendDataBuilderInfo},
Expand Down Expand Up @@ -522,6 +523,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle.clone(),
false,
)
.await;
}
Expand All @@ -537,6 +539,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle.clone(),
false,
)
.await;
let next_response =
Expand Down Expand Up @@ -606,6 +609,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
false, // force_flush_trace_stats
)
.await;
}
Expand Down Expand Up @@ -639,6 +643,7 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
false, // force_flush_trace_stats
)
.await;
}
Expand Down Expand Up @@ -697,13 +702,15 @@ async fn extension_loop_active(
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr_handle,
true, // force_flush_trace_stats
)
.await;
return Ok(());
}
}
}

#[allow(clippy::too_many_arguments)]
async fn blocking_flush_all(
logs_flusher: &LogsFlusher,
metrics_flushers: &mut [MetricsFlusher],
Expand All @@ -712,6 +719,7 @@ async fn blocking_flush_all(
proxy_flusher: &ProxyFlusher,
race_flush_interval: &mut tokio::time::Interval,
metrics_aggr_handle: &MetricsAggregatorHandle,
force_flush_trace_stats: bool,
) {
let flush_response = metrics_aggr_handle
.flush()
Expand All @@ -731,7 +739,7 @@ async fn blocking_flush_all(
logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
trace_flusher.flush(None),
stats_flusher.flush(),
stats_flusher.flush(force_flush_trace_stats),
proxy_flusher.flush(None),
);
race_flush_interval.reset();
Expand Down Expand Up @@ -981,7 +989,12 @@ fn start_trace_agent(
tokio_util::sync::CancellationToken,
) {
// Stats
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
let (stats_concentrator_service, stats_concentrator_handle) =
StatsConcentratorService::new(Arc::clone(config));
tokio::spawn(stats_concentrator_service.run());
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
api_key_factory.clone(),
stats_aggregator.clone(),
Expand Down Expand Up @@ -1029,6 +1042,7 @@ fn start_trace_agent(
invocation_processor,
appsec_processor,
Arc::clone(tags_provider),
stats_concentrator_handle,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand Down
3 changes: 3 additions & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ pub mod proxy_aggregator;
pub mod proxy_flusher;
pub mod span_pointers;
pub mod stats_aggregator;
pub mod stats_concentrator;
pub mod stats_concentrator_service;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_aggregator;
pub mod trace_flusher;
pub mod trace_processor;
pub mod trace_stats_processor;

// URL for a call to the Lambda runtime API. The value may be replaced if `AWS_LAMBDA_RUNTIME_API` is set.
const LAMBDA_RUNTIME_URL_PREFIX: &str = "http://127.0.0.1:9001";
Expand Down
62 changes: 40 additions & 22 deletions bottlecap/src/traces/stats_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::traces::stats_concentrator_service::StatsConcentratorHandle;
use datadog_trace_protobuf::pb::ClientStatsPayload;
use std::collections::VecDeque;
use tracing::error;

#[allow(clippy::empty_line_after_doc_comments)]
/// Maximum number of entries in a stat payload.
Expand All @@ -22,37 +24,44 @@ pub struct StatsAggregator {
queue: VecDeque<ClientStatsPayload>,
max_content_size_bytes: usize,
buffer: Vec<ClientStatsPayload>,
}

impl Default for StatsAggregator {
fn default() -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
buffer: Vec::new(),
}
}
concentrator: StatsConcentratorHandle,
}

/// Takes in individual trace stats payloads and aggregates them into batches to be flushed to Datadog.
impl StatsAggregator {
#[allow(dead_code)]
#[allow(clippy::must_use_candidate)]
pub fn new(max_content_size_bytes: usize) -> Self {
fn new(max_content_size_bytes: usize, concentrator: StatsConcentratorHandle) -> Self {
StatsAggregator {
queue: VecDeque::new(),
max_content_size_bytes,
buffer: Vec::new(),
concentrator,
}
}

#[must_use]
pub fn new_with_concentrator(concentrator: StatsConcentratorHandle) -> Self {
Self::new(MAX_CONTENT_SIZE_BYTES, concentrator)
}

/// Takes in an individual trace stats payload.
pub fn add(&mut self, payload: ClientStatsPayload) {
self.queue.push_back(payload);
}

/// Returns a batch of trace stats payloads, subject to the max content size.
pub fn get_batch(&mut self) -> Vec<ClientStatsPayload> {
pub async fn get_batch(&mut self, force_flush: bool) -> Vec<ClientStatsPayload> {
// Pull stats data from concentrator
match self.concentrator.flush(force_flush).await {
Ok(stats) => {
self.queue.extend(stats);
}
Err(e) => {
error!("Error getting stats from the stats concentrator: {e:?}");
}
}

let mut batch_size = 0;

// Fill the batch
Expand Down Expand Up @@ -80,10 +89,15 @@ impl StatsAggregator {
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::config::Config;
use crate::traces::stats_concentrator_service::StatsConcentratorService;
use std::sync::Arc;

#[test]
fn test_add() {
let mut aggregator = StatsAggregator::default();
let config = Arc::new(Config::default());
let (_, concentrator) = StatsConcentratorService::new(config);
let mut aggregator = StatsAggregator::new_with_concentrator(concentrator);
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
env: "dev".to_string(),
Expand All @@ -106,9 +120,11 @@ mod tests {
assert_eq!(aggregator.queue[0], payload);
}

#[test]
fn test_get_batch() {
let mut aggregator = StatsAggregator::default();
#[tokio::test]
async fn test_get_batch() {
let config = Arc::new(Config::default());
let (_, concentrator) = StatsConcentratorService::new(config);
let mut aggregator = StatsAggregator::new_with_concentrator(concentrator);
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
env: "dev".to_string(),
Expand All @@ -127,13 +143,15 @@ mod tests {
};
aggregator.add(payload.clone());
assert_eq!(aggregator.queue.len(), 1);
let batch = aggregator.get_batch();
let batch = aggregator.get_batch(false).await;
assert_eq!(batch, vec![payload]);
}

#[test]
fn test_get_batch_full_entries() {
let mut aggregator = StatsAggregator::new(640);
#[tokio::test]
async fn test_get_batch_full_entries() {
let config = Arc::new(Config::default());
let (_, concentrator) = StatsConcentratorService::new(config);
let mut aggregator = StatsAggregator::new(640, concentrator);
// Payload below is 115 bytes
let payload = ClientStatsPayload {
hostname: "hostname".to_string(),
Expand All @@ -158,12 +176,12 @@ mod tests {
aggregator.add(payload.clone());

// The batch should only contain the first 2 payloads
let first_batch = aggregator.get_batch();
let first_batch = aggregator.get_batch(false).await;
assert_eq!(first_batch, vec![payload.clone(), payload.clone()]);
assert_eq!(aggregator.queue.len(), 1);

// The second batch should only contain the last log
let second_batch = aggregator.get_batch();
let second_batch = aggregator.get_batch(false).await;
assert_eq!(second_batch, vec![payload]);
assert_eq!(aggregator.queue.len(), 0);
}
Expand Down
38 changes: 38 additions & 0 deletions bottlecap/src/traces/stats_concentrator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::config::Config;
use datadog_trace_protobuf::pb;
use std::sync::Arc;

// Event sent to the stats concentrator
#[derive(Clone, Copy)]
pub struct StatsEvent {
pub time: u64,
pub aggregation_key: AggregationKey,
pub stats: Stats,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
pub struct AggregationKey {}

#[derive(Clone, Debug, Default, Copy)]
pub struct Stats {}

pub struct StatsConcentrator {
_config: Arc<Config>,
}

// Aggregates stats into buckets, which are then pulled by the stats aggregator.
impl StatsConcentrator {
#[must_use]
pub fn new(config: Arc<Config>) -> Self {
Self { _config: config }
}

pub fn add(&mut self, _stats_event: StatsEvent) {}

// force_flush: If true, flush all stats. If false, flush stats except for the few latest
// buckets, which may still be getting data.
#[must_use]
pub fn flush(&mut self, _force_flush: bool) -> Vec<pb::ClientStatsPayload> {
vec![]
}
}
79 changes: 79 additions & 0 deletions bottlecap/src/traces/stats_concentrator_service.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should have a submodule called stats for all of this stuff, instead of having it as stats_...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might do it in a separate PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use tokio::sync::{mpsc, oneshot};

use crate::config::Config;
use crate::traces::stats_concentrator::StatsConcentrator;
use crate::traces::stats_concentrator::StatsEvent;
use datadog_trace_protobuf::pb;
use std::sync::Arc;
use tracing::error;

#[derive(Debug, thiserror::Error)]
pub enum StatsError {
#[error("Failed to send command to concentrator: {0}")]
SendError(mpsc::error::SendError<ConcentratorCommand>),
#[error("Failed to receive response from concentrator: {0}")]
RecvError(oneshot::error::RecvError),
}

pub enum ConcentratorCommand {
Add(StatsEvent),
Flush(bool, oneshot::Sender<Vec<pb::ClientStatsPayload>>),
}

#[derive(Clone)]
pub struct StatsConcentratorHandle {
tx: mpsc::UnboundedSender<ConcentratorCommand>,
}

impl StatsConcentratorHandle {
pub fn add(
&self,
stats_event: StatsEvent,
) -> Result<(), mpsc::error::SendError<ConcentratorCommand>> {
self.tx.send(ConcentratorCommand::Add(stats_event))
}

pub async fn flush(
&self,
force_flush: bool,
) -> Result<Vec<pb::ClientStatsPayload>, StatsError> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(ConcentratorCommand::Flush(force_flush, response_tx))
.map_err(StatsError::SendError)?;
let stats = response_rx.await.map_err(StatsError::RecvError)?;
Ok(stats)
}
}

pub struct StatsConcentratorService {
concentrator: StatsConcentrator,
rx: mpsc::UnboundedReceiver<ConcentratorCommand>,
}

// A service that handles add() and flush() requests in the same queue,
// to avoid using mutex, which may cause lock contention.
impl StatsConcentratorService {
#[must_use]
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let handle = StatsConcentratorHandle { tx };
let concentrator = StatsConcentrator::new(config);
let service: StatsConcentratorService = Self { concentrator, rx };
(service, handle)
}

pub async fn run(mut self) {
while let Some(command) = self.rx.recv().await {
match command {
ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event),
ConcentratorCommand::Flush(force_flush, response_tx) => {
let stats = self.concentrator.flush(force_flush);
if let Err(e) = response_tx.send(stats) {
error!("Failed to return trace stats: {e:?}");
}
}
}
}
}
}
9 changes: 5 additions & 4 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait StatsFlusher {
/// Flushes stats to the Datadog trace stats intake.
async fn send(&self, traces: Vec<pb::ClientStatsPayload>);

async fn flush(&self);
async fn flush(&self, force_flush: bool);
}

#[allow(clippy::module_name_repetitions)]
Expand Down Expand Up @@ -116,14 +116,15 @@ impl StatsFlusher for ServerlessStatsFlusher {
}
};
}
async fn flush(&self) {

async fn flush(&self, force_flush: bool) {
let mut guard = self.aggregator.lock().await;

let mut stats = guard.get_batch();
let mut stats = guard.get_batch(force_flush).await;
while !stats.is_empty() {
self.send(stats).await;

stats = guard.get_batch();
stats = guard.get_batch(force_flush).await;
}
}
}
Loading
Loading