From e0ca5d105ecc431df0cbeb90d402373dcbe05e6e Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 11 Feb 2026 16:57:18 -0500 Subject: [PATCH 1/7] feat(dogstatsd): add configurable SO_RCVBUF for UDP socket Use the `socket2` crate to create the UDP socket with a configurable receive buffer size (`SO_RCVBUF`). On constrained environments like AWS Lambda, the kernel caps `SO_RCVBUF` at ~416 KiB (2x rmem_max), which can cause silent packet loss under burst traffic. This change lets operators tune the buffer via `DD_DOGSTATSD_SO_RCVBUF`, matching the Go agent's configuration option. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 21 +++++++-- crates/dogstatsd/Cargo.toml | 3 +- crates/dogstatsd/src/dogstatsd.rs | 53 +++++++++++++++++++++- crates/dogstatsd/tests/integration_test.rs | 5 ++ 4 files changed, 74 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9985655d..b09b5127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,6 +516,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", + "socket2 0.5.10", "thiserror 1.0.69", "tokio", "tokio-util", @@ -1025,7 +1026,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.2", "tokio", "tower-service", "tracing", @@ -1745,7 +1746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.114", @@ -1831,7 +1832,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -1868,7 +1869,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.2", "tracing", "windows-sys 0.60.2", ] @@ -2397,6 +2398,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.2" @@ -2579,7 +2590,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.2", "tokio-macros", "windows-sys 0.61.2", ] diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index 9ef8ccc0..f660eb86 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -19,7 +19,8 @@ reqwest = { version = "0.12.4", features = ["json", "http2"], default-features = serde = { version = "1.0.197", default-features = false, features = ["derive"] } serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } thiserror = { version = "1.0.58", default-features = false } -tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] } +tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] } +socket2 = { version = "0.5", default-features = false } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false } regex = { version = "1.10.6", default-features = false } diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 30fbe995..515fdaca 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -13,6 +13,7 @@ use std::str::Split; use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; use crate::metric::{id, parse, Metric}; +use socket2::{Domain, Protocol, Socket, Type}; use tracing::{debug, error, trace}; // Windows-specific imports @@ -36,6 +37,10 @@ pub struct DogStatsDConfig { /// Optional Windows named pipe name. (e.g., "\\\\.\\pipe\\my_pipe"). #[cfg(all(windows, feature = "windows-pipes"))] pub windows_pipe_name: Option, + /// Optional socket receive buffer size (SO_RCVBUF) in bytes. + /// If None, uses the OS default. Increase this to reduce packet loss under high load. + /// Corresponds to `DD_DOGSTATSD_SO_RCVBUF` in the Datadog Agent. + pub so_rcvbuf: Option, } /// Represents the source of a DogStatsD message. Varies by transport method. @@ -174,9 +179,9 @@ impl DogStatsD { let addr = format!("{}:{}", config.host, config.port); // TODO (UDS socket) #[allow(clippy::expect_used)] - let socket = tokio::net::UdpSocket::bind(addr) + let socket = create_udp_socket(&addr, config.so_rcvbuf) .await - .expect("couldn't bind to address"); + .expect("couldn't create UDP socket"); BufferReader::UdpSocket(socket) }; @@ -267,6 +272,50 @@ impl DogStatsD { } } +async fn create_udp_socket( + addr: &str, + so_rcvbuf: Option, +) -> std::io::Result { + let socket_addr: SocketAddr = addr.parse().map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid address '{}': {}", addr, e), + ) + })?; + + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + + // Log the kernel's rmem_max cap so operators can tell whether the requested + // SO_RCVBUF was capped by the OS (diagnostic, not required for operation). + if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") { + debug!("DogStatsD kernel rmem_max={} bytes", rmem_max.trim()); + } + + if let Some(buf_size) = so_rcvbuf { + socket.set_recv_buffer_size(buf_size)?; + + // The kernel may cap the value; log what we actually got. + let actual = socket.recv_buffer_size().unwrap_or(0); + debug!( + "DogStatsD SO_RCVBUF: requested={} bytes, actual={} bytes", + buf_size, actual + ); + } else { + debug!( + "DogStatsD using default SO_RCVBUF: {} bytes", + socket.recv_buffer_size().unwrap_or(0) + ); + } + + // Required for tokio compatibility + socket.set_nonblocking(true)?; + + socket.bind(&socket_addr.into())?; + + let std_socket: std::net::UdpSocket = socket.into(); + tokio::net::UdpSocket::from_std(std_socket) +} + /// Named Pipe server - accepts client connections and forwards metrics. /// /// Uses a multi-instance approach (like winio in the main agent): diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index e51b0973..24be9452 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -102,6 +102,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok metric_namespace: None, #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: None, + so_rcvbuf: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( @@ -308,6 +309,7 @@ async fn test_named_pipe_basic_communication() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token, @@ -362,6 +364,7 @@ async fn test_named_pipe_disconnect_reconnect() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone, @@ -431,6 +434,7 @@ async fn test_named_pipe_cancellation() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone, @@ -474,6 +478,7 @@ async fn test_buffer_split_message() { port: 0, metric_namespace: None, windows_pipe_name: Some(pipe_name.to_string()), + so_rcvbuf: None, }, handle, cancel_token_clone, From 8817f537727580fb01674846453d7ba38de580cf Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:28:09 -0500 Subject: [PATCH 2/7] remove comment about which env var it would represent in other crates --- crates/dogstatsd/src/dogstatsd.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 515fdaca..9f497d7b 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -39,7 +39,6 @@ pub struct DogStatsDConfig { pub windows_pipe_name: Option, /// Optional socket receive buffer size (SO_RCVBUF) in bytes. /// If None, uses the OS default. Increase this to reduce packet loss under high load. - /// Corresponds to `DD_DOGSTATSD_SO_RCVBUF` in the Datadog Agent. pub so_rcvbuf: Option, } From 92133b3eb59cb27912ad8c8425293d6d0d8393b2 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:34:02 -0500 Subject: [PATCH 3/7] update comments --- crates/dogstatsd/src/dogstatsd.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 9f497d7b..bad421d3 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -284,10 +284,10 @@ async fn create_udp_socket( let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; - // Log the kernel's rmem_max cap so operators can tell whether the requested - // SO_RCVBUF was capped by the OS (diagnostic, not required for operation). + // Log the kernel's rmem_max cap so operators can tell + // whether the requested SO_RCVBUF was capped by the OS. if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") { - debug!("DogStatsD kernel rmem_max={} bytes", rmem_max.trim()); + debug!("DogStatsD Kernel rmem_max={} bytes", rmem_max.trim()); } if let Some(buf_size) = so_rcvbuf { From 377a20e2df0f494e77e1aab33d4d1f6dc9ab3f54 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:38:50 -0500 Subject: [PATCH 4/7] fmt & tests --- crates/datadog-serverless-compat/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 66be3554..bd55ac0a 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -232,6 +232,7 @@ async fn start_dogstatsd( metric_namespace, #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name, + so_rcvbuf: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); From 35d39720ea11f66c01397219d0d9097e4ef1e8b9 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 16:22:15 -0500 Subject: [PATCH 5/7] socket2@0.6 --- Cargo.lock | 20 +++++--------------- crates/dogstatsd/Cargo.toml | 2 +- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b09b5127..6617fdfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,7 +516,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", - "socket2 0.5.10", + "socket2", "thiserror 1.0.69", "tokio", "tokio-util", @@ -1026,7 +1026,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.2", + "socket2", "tokio", "tower-service", "tracing", @@ -1832,7 +1832,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.6.2", + "socket2", "thiserror 2.0.18", "tokio", "tracing", @@ -1869,7 +1869,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.2", + "socket2", "tracing", "windows-sys 0.60.2", ] @@ -2398,16 +2398,6 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -[[package]] -name = "socket2" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "socket2" version = "0.6.2" @@ -2590,7 +2580,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.2", + "socket2", "tokio-macros", "windows-sys 0.61.2", ] diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index f660eb86..d9916ac9 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -20,7 +20,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"] } serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } thiserror = { version = "1.0.58", default-features = false } tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] } -socket2 = { version = "0.5", default-features = false } +socket2 = { version = "0.6", default-features = false } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false } regex = { version = "1.10.6", default-features = false } From e28ae7f90450df4bc4d27d5c5bfbc110b6a29f0d Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:04:12 -0500 Subject: [PATCH 6/7] fixes --- crates/dogstatsd/src/dogstatsd.rs | 35 +++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index bad421d3..1daea4d9 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -275,10 +275,12 @@ async fn create_udp_socket( addr: &str, so_rcvbuf: Option, ) -> std::io::Result { - let socket_addr: SocketAddr = addr.parse().map_err(|e| { + // Resolve via lookup_host to support hostnames (e.g. "localhost:8125"), + // matching the previous behavior of tokio::net::UdpSocket::bind(). + let socket_addr = tokio::net::lookup_host(addr).await?.next().ok_or_else(|| { std::io::Error::new( std::io::ErrorKind::InvalidInput, - format!("Invalid address '{}': {}", addr, e), + format!("Could not resolve address '{}'", addr), ) })?; @@ -286,6 +288,7 @@ async fn create_udp_socket( // Log the kernel's rmem_max cap so operators can tell // whether the requested SO_RCVBUF was capped by the OS. + #[cfg(target_os = "linux")] if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") { debug!("DogStatsD Kernel rmem_max={} bytes", rmem_max.trim()); } @@ -522,6 +525,34 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d .starts_with("custom.namespace.my.metric")); } + #[tokio::test] + async fn test_create_udp_socket_default_so_rcvbuf() { + let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap(); + let std_socket = socket.into_std().unwrap(); + let s2 = socket2::Socket::from(std_socket); + let buf_size = s2.recv_buffer_size().unwrap(); + assert!(buf_size > 0, "default SO_RCVBUF should be non-zero"); + } + + #[tokio::test] + async fn test_create_udp_socket_custom_so_rcvbuf() { + let requested: usize = 262_144; + let socket = super::create_udp_socket("127.0.0.1:0", Some(requested)) + .await + .unwrap(); + let std_socket = socket.into_std().unwrap(); + let s2 = socket2::Socket::from(std_socket); + let actual = s2.recv_buffer_size().unwrap(); + // The kernel may double the value (Linux) or cap it, but it should + // be at least as large as the requested size. + assert!( + actual >= requested, + "SO_RCVBUF actual ({}) should be >= requested ({})", + actual, + requested + ); + } + async fn setup_and_consume_dogstatsd( statsd_string: &str, metric_namespace: Option, From 8a215bbb08842ad4f1e37d08c5f4a70d539cb5ba Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:09:06 -0500 Subject: [PATCH 7/7] remove io-util already gated --- crates/dogstatsd/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index d9916ac9..e9388790 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -19,7 +19,7 @@ reqwest = { version = "0.12.4", features = ["json", "http2"], default-features = serde = { version = "1.0.197", default-features = false, features = ["derive"] } serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } thiserror = { version = "1.0.58", default-features = false } -tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] } +tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] } socket2 = { version = "0.6", default-features = false } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false }