diff --git a/Cargo.lock b/Cargo.lock index 9985655d..6617fdfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,6 +516,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", + "socket2", "thiserror 1.0.69", "tokio", "tokio-util", @@ -1745,7 +1746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.114", diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 66be3554..bd55ac0a 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -232,6 +232,7 @@ async fn start_dogstatsd( metric_namespace, #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name, + so_rcvbuf: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index 9ef8ccc0..e9388790 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -20,6 +20,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"] } serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } thiserror = { version = "1.0.58", default-features = false } tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] } +socket2 = { version = "0.6", default-features = false } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false } regex = { version = "1.10.6", default-features = false } diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 30fbe995..1daea4d9 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -13,6 +13,7 @@ use std::str::Split; use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; use crate::metric::{id, parse, Metric}; +use socket2::{Domain, Protocol, Socket, Type}; use tracing::{debug, error, trace}; // Windows-specific imports @@ -36,6 +37,9 @@ pub struct DogStatsDConfig { /// Optional Windows named pipe name. (e.g., "\\\\.\\pipe\\my_pipe"). #[cfg(all(windows, feature = "windows-pipes"))] pub windows_pipe_name: Option, + /// Optional socket receive buffer size (SO_RCVBUF) in bytes. + /// If None, uses the OS default. Increase this to reduce packet loss under high load. + pub so_rcvbuf: Option, } /// Represents the source of a DogStatsD message. Varies by transport method. @@ -174,9 +178,9 @@ impl DogStatsD { let addr = format!("{}:{}", config.host, config.port); // TODO (UDS socket) #[allow(clippy::expect_used)] - let socket = tokio::net::UdpSocket::bind(addr) + let socket = create_udp_socket(&addr, config.so_rcvbuf) .await - .expect("couldn't bind to address"); + .expect("couldn't create UDP socket"); BufferReader::UdpSocket(socket) }; @@ -267,6 +271,53 @@ impl DogStatsD { } } +async fn create_udp_socket( + addr: &str, + so_rcvbuf: Option, +) -> std::io::Result { + // Resolve via lookup_host to support hostnames (e.g. "localhost:8125"), + // matching the previous behavior of tokio::net::UdpSocket::bind(). + let socket_addr = tokio::net::lookup_host(addr).await?.next().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Could not resolve address '{}'", addr), + ) + })?; + + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + + // Log the kernel's rmem_max cap so operators can tell + // whether the requested SO_RCVBUF was capped by the OS. + #[cfg(target_os = "linux")] + if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") { + debug!("DogStatsD Kernel rmem_max={} bytes", rmem_max.trim()); + } + + if let Some(buf_size) = so_rcvbuf { + socket.set_recv_buffer_size(buf_size)?; + + // The kernel may cap the value; log what we actually got. + let actual = socket.recv_buffer_size().unwrap_or(0); + debug!( + "DogStatsD SO_RCVBUF: requested={} bytes, actual={} bytes", + buf_size, actual + ); + } else { + debug!( + "DogStatsD using default SO_RCVBUF: {} bytes", + socket.recv_buffer_size().unwrap_or(0) + ); + } + + // Required for tokio compatibility + socket.set_nonblocking(true)?; + + socket.bind(&socket_addr.into())?; + + let std_socket: std::net::UdpSocket = socket.into(); + tokio::net::UdpSocket::from_std(std_socket) +} + /// Named Pipe server - accepts client connections and forwards metrics. /// /// Uses a multi-instance approach (like winio in the main agent): @@ -474,6 +525,34 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d .starts_with("custom.namespace.my.metric")); } + #[tokio::test] + async fn test_create_udp_socket_default_so_rcvbuf() { + let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap(); + let std_socket = socket.into_std().unwrap(); + let s2 = socket2::Socket::from(std_socket); + let buf_size = s2.recv_buffer_size().unwrap(); + assert!(buf_size > 0, "default SO_RCVBUF should be non-zero"); + } + + #[tokio::test] + async fn test_create_udp_socket_custom_so_rcvbuf() { + let requested: usize = 262_144; + let socket = super::create_udp_socket("127.0.0.1:0", Some(requested)) + .await + .unwrap(); + let std_socket = socket.into_std().unwrap(); + let s2 = socket2::Socket::from(std_socket); + let actual = s2.recv_buffer_size().unwrap(); + // The kernel may double the value (Linux) or cap it, but it should + // be at least as large as the requested size. + assert!( + actual >= requested, + "SO_RCVBUF actual ({}) should be >= requested ({})", + actual, + requested + ); + } + async fn setup_and_consume_dogstatsd( statsd_string: &str, metric_namespace: Option, diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index e51b0973..24be9452 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -102,6 +102,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok metric_namespace: None, #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: None, + so_rcvbuf: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( @@ -308,6 +309,7 @@ async fn test_named_pipe_basic_communication() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token, @@ -362,6 +364,7 @@ async fn test_named_pipe_disconnect_reconnect() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone, @@ -431,6 +434,7 @@ async fn test_named_pipe_cancellation() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone, @@ -474,6 +478,7 @@ async fn test_buffer_split_message() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone,