diff --git a/Cargo.lock b/Cargo.lock index 79fa2f47..e6fde7ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "zstd", ] [[package]] diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 62f94807..3b53276e 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -15,6 +15,7 @@ tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-core = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] } +zstd = { version = "0.13.3", default-features = false } [[bin]] name = "datadog-serverless-compat" diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 043548ae..7f4681e9 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -14,6 +14,7 @@ use tokio::{ }; use tracing::{debug, error, info}; use tracing_subscriber::EnvFilter; +use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, @@ -215,6 +216,7 @@ async fn start_dogstatsd( https_proxy, timeout: DOGSTATSD_TIMEOUT_DURATION, retry_strategy: RetryStrategy::LinearBackoff(3, 1), + compression_level: CompressionLevel::try_from(6).unwrap_or_default(), }); Some(metrics_flusher) } diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index c2f2b06a..7e1401e9 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -19,6 +19,7 @@ use std::sync::OnceLock; use std::time::Duration; use tracing::{debug, error}; use zstd::stream::write::Encoder; +use zstd::zstd_safe::CompressionLevel; // TODO: Move to the more ergonomic LazyLock when MSRV is 1.80 static SITE_RE: OnceLock = OnceLock::new(); @@ -138,6 +139,7 @@ pub struct DdApi { metrics_intake_url_prefix: MetricsIntakeUrlPrefix, client: Option, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, } impl DdApi { @@ -148,6 +150,7 @@ impl DdApi { https_proxy: Option, timeout: Duration, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, ) -> Self { let client = build_client(https_proxy, timeout) .inspect_err(|e| { @@ -159,6 +162,7 @@ impl DdApi { metrics_intake_url_prefix, client, retry_strategy, + compression_level, } } @@ -206,7 +210,7 @@ impl DdApi { let start = std::time::Instant::now(); let result = (|| -> std::io::Result> { - let mut encoder = Encoder::new(Vec::new(), 6)?; + let mut encoder = Encoder::new(Vec::new(), self.compression_level)?; encoder.write_all(&body)?; encoder.finish() })(); diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index c114405e..b523a931 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::OnceCell; use tracing::{debug, error}; +use zstd::zstd_safe::CompressionLevel; #[derive(Clone)] pub struct Flusher { @@ -20,6 +21,7 @@ pub struct Flusher { retry_strategy: RetryStrategy, aggregator_handle: AggregatorHandle, dd_api: OnceCell>, + compression_level: CompressionLevel, } pub struct FlusherConfig { @@ -29,6 +31,7 @@ pub struct FlusherConfig { pub https_proxy: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, + pub compression_level: CompressionLevel, } impl Flusher { @@ -40,6 +43,7 @@ impl Flusher { timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator_handle: config.aggregator_handle, + compression_level: config.compression_level, dd_api: OnceCell::new(), } } @@ -55,6 +59,7 @@ impl Flusher { self.https_proxy.clone(), self.timeout, self.retry_strategy.clone(), + self.compression_level.clone(), )), None => { error!("Failed to create dd_api: failed to get API key"); diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 4d07f455..4be84bf8 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -18,6 +18,7 @@ use tokio::{ time::{sleep, timeout, Duration}, }; use tokio_util::sync::CancellationToken; +use zstd::zstd_safe::CompressionLevel; #[cfg(test)] #[tokio::test] @@ -60,6 +61,8 @@ async fn dogstatsd_server_ships_series() { https_proxy: None, timeout: std::time::Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(3), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), }); let server_address = "127.0.0.1:18125"; @@ -137,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -192,6 +196,7 @@ async fn test_send_with_retry_linear_backoff_success() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -246,6 +251,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator