Skip to content
19 changes: 15 additions & 4 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev =
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "d131de8419c191ce21c91bb30b5915c4d8a2cc5a", default-features = false }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "abfec752b0638a9e4096e1465acd4bb2651edfa7", default-features = false }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false }
libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] }

[dev-dependencies]
Expand Down
80 changes: 46 additions & 34 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ use datadog_trace_obfuscation::obfuscation_config;
use datadog_trace_utils::send_data::SendData;
use decrypt::resolve_secrets;
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
aggregator_service::AggregatorHandle as MetricsAggregatorHandle,
aggregator_service::AggregatorService as MetricsAggregatorService,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{
Expand All @@ -87,7 +88,7 @@ use std::{
os::unix::process::CommandExt,
path::Path,
process::Command,
sync::{Arc, Mutex},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::Mutex as TokioMutex, sync::RwLock, sync::mpsc::Sender, task::JoinHandle};
Expand Down Expand Up @@ -494,32 +495,31 @@ async fn extension_loop_active(
);

let metrics_aggr_init_start_time = Instant::now();
let metrics_aggr = Arc::new(Mutex::new(
MetricsAggregator::new(
SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("failed to create aggregator"),
));
let (metrics_aggr_service, metrics_aggr_handle) = MetricsAggregatorService::new(
SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("can't create metrics service");
debug!(
"Metrics aggregator created in {:} microseconds",
metrics_aggr_init_start_time
.elapsed()
.as_micros()
.to_string()
);
start_dogstatsd_aggregator(metrics_aggr_service);

let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
Arc::clone(&api_key_factory),
&metrics_aggr,
&metrics_aggr_handle,
config,
)));
// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
Arc::clone(&aws_config),
Arc::clone(&metrics_aggr),
metrics_aggr_handle.clone(),
)));
// AppSec processor (if enabled)
let appsec_processor = match AppSecProcessor::new(config) {
Expand Down Expand Up @@ -568,7 +568,7 @@ async fn extension_loop_active(
}
});

let dogstatsd_cancel_token = start_dogstatsd(&metrics_aggr).await;
let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await;

let telemetry_listener_cancel_token =
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;
Expand Down Expand Up @@ -623,7 +623,7 @@ async fn extension_loop_active(
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
&metrics_aggr_handle.clone(),
)
.await;
}
Expand All @@ -638,7 +638,7 @@ async fn extension_loop_active(
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
&metrics_aggr_handle.clone(),
)
.await;
let next_response = next_event(client, &r.extension_id).await;
Expand Down Expand Up @@ -670,11 +670,15 @@ async fn extension_loop_active(
}));
let (metrics_flushers_copy, series, sketches) = {
let locked_metrics = metrics_flushers.lock().await;
let mut aggregator = metrics_aggr.lock().expect("lock poisoned");
let flush_response = metrics_aggr_handle
.clone()
.flush()
.await
.expect("can't flush metrics handle");
(
locked_metrics.clone(),
aggregator.consume_metrics(),
aggregator.consume_distributions(),
flush_response.series,
flush_response.distributions,
)
};
for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() {
Expand Down Expand Up @@ -711,7 +715,7 @@ async fn extension_loop_active(
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
&metrics_aggr_handle,
)
.await;
last_continuous_flush_error = false;
Expand Down Expand Up @@ -751,7 +755,7 @@ async fn extension_loop_active(
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
&metrics_aggr_handle,
)
.await;
}
Expand Down Expand Up @@ -807,7 +811,7 @@ async fn extension_loop_active(
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
&metrics_aggr_handle,
)
.await;
return Ok(());
Expand All @@ -822,18 +826,20 @@ async fn blocking_flush_all(
stats_flusher: &impl StatsFlusher,
proxy_flusher: &ProxyFlusher,
race_flush_interval: &mut tokio::time::Interval,
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
metrics_aggr_handle: &MetricsAggregatorHandle,
) {
let (series, sketches) = {
let mut aggregator = metrics_aggr.lock().expect("lock poisoned");
(
aggregator.consume_metrics(),
aggregator.consume_distributions(),
)
};
let flush_response = metrics_aggr_handle
.flush()
.await
.expect("can't flush metrics aggr handle");
let metrics_futures: Vec<_> = metrics_flushers
.iter_mut()
.map(|f| f.flush_metrics(series.clone(), sketches.clone()))
.map(|f| {
f.flush_metrics(
flush_response.series.clone(),
flush_response.distributions.clone(),
)
})
.collect();

tokio::join!(
Expand Down Expand Up @@ -1008,7 +1014,7 @@ fn start_logs_agent(

fn start_metrics_flushers(
api_key_factory: Arc<ApiKeyFactory>,
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
metrics_aggr_handle: &MetricsAggregatorHandle,
config: &Arc<Config>,
) -> Vec<MetricsFlusher> {
let mut flushers = Vec::new();
Expand All @@ -1031,7 +1037,7 @@ fn start_metrics_flushers(

let flusher_config = MetricsFlusherConfig {
api_key_factory,
aggregator: Arc::clone(metrics_aggr),
aggregator_handle: metrics_aggr_handle.clone(),
metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"),
https_proxy: config.proxy_https.clone(),
timeout: Duration::from_secs(config.flush_timeout),
Expand Down Expand Up @@ -1059,7 +1065,7 @@ fn start_metrics_flushers(
let additional_api_key_factory = Arc::new(ApiKeyFactory::new(api_key));
let additional_flusher_config = MetricsFlusherConfig {
api_key_factory: additional_api_key_factory,
aggregator: metrics_aggr.clone(),
aggregator_handle: metrics_aggr_handle.clone(),
metrics_intake_url_prefix: metrics_intake_url.clone(),
https_proxy: config.proxy_https.clone(),
timeout: Duration::from_secs(config.flush_timeout),
Expand Down Expand Up @@ -1157,15 +1163,21 @@ fn start_trace_agent(
)
}

async fn start_dogstatsd(metrics_aggr: &Arc<Mutex<MetricsAggregator>>) -> CancellationToken {
fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) {
tokio::spawn(async move {
aggr_service.run().await;
});
}

async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> CancellationToken {
let dogstatsd_config = DogStatsDConfig {
host: EXTENSION_HOST.to_string(),
port: DOGSTATSD_PORT,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
let dogstatsd_client = DogStatsD::new(
&dogstatsd_config,
Arc::clone(metrics_aggr),
metrics_aggr_handle,
dogstatsd_cancel_token.clone(),
)
.await;
Expand Down
6 changes: 4 additions & 2 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ struct UniversalInstrumentationData {
}

impl Default for ContextBuffer {
/// Creates a new `ContextBuffer` with a default capacity of 5.
/// Creates a new `ContextBuffer` with a default capacity of 500
/// This gives us enough capacity to process events which may be delayed due to async tasks
/// piling up preventing us from reading them quickly enough
///
fn default() -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comment to explain why bumping up to a much bigger value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

ContextBuffer::with_capacity(5)
ContextBuffer::with_capacity(500)
}
}

Expand Down
Loading
Loading