Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ async fn start_dogstatsd(
metric_namespace,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name,
so_rcvbuf: None,
};
Comment on lines 232 to 236
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions configuring this via DD_DOGSTATSD_SO_RCVBUF, but so_rcvbuf is still hard-coded to None here. If this binary is expected to expose the knob, wire so_rcvbuf from env/config (with parsing/validation) so the option is actually usable.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not adding an option a project I don't manage – ensuring that it works as before

let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();

Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] }
thiserror = { version = "1.0.58", default-features = false }
tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] }
socket2 = { version = "0.6", default-features = false }
Comment thread
duncanista marked this conversation as resolved.
tokio-util = { version = "0.7.11", default-features = false }
tracing = { version = "0.1.40", default-features = false }
regex = { version = "1.10.6", default-features = false }
Expand Down
83 changes: 81 additions & 2 deletions crates/dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::str::Split;
use crate::aggregator_service::AggregatorHandle;
use crate::errors::ParseError::UnsupportedType;
use crate::metric::{id, parse, Metric};
use socket2::{Domain, Protocol, Socket, Type};
use tracing::{debug, error, trace};

// Windows-specific imports
Expand All @@ -36,6 +37,9 @@ pub struct DogStatsDConfig {
/// Optional Windows named pipe name. (e.g., "\\\\.\\pipe\\my_pipe").
#[cfg(all(windows, feature = "windows-pipes"))]
pub windows_pipe_name: Option<String>,
/// Optional socket receive buffer size (SO_RCVBUF) in bytes.
/// If None, uses the OS default. Increase this to reduce packet loss under high load.
pub so_rcvbuf: Option<usize>,
}

/// Represents the source of a DogStatsD message. Varies by transport method.
Expand Down Expand Up @@ -174,9 +178,9 @@ impl DogStatsD {
let addr = format!("{}:{}", config.host, config.port);
// TODO (UDS socket)
#[allow(clippy::expect_used)]
let socket = tokio::net::UdpSocket::bind(addr)
let socket = create_udp_socket(&addr, config.so_rcvbuf)
.await
.expect("couldn't bind to address");
.expect("couldn't create UDP socket");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibility of not-very-specific errors here, since the raw io::Error from bind() and set_nonblocking()
could look identical. Could be more specific with something like

socket.set_recv_buffer_size(buf_size).map_err(|e| {                                                       
      std::io::Error::new(e.kind(), format!("failed to set SO_RCVBUF to {}: {}", buf_size, e))              
  })?;                     
                                                                                                            
  socket.set_nonblocking(true).map_err(|e| {
      std::io::Error::new(e.kind(), format!("failed to set nonblocking: {}", e))
  })?;

  socket.bind(&socket_addr.into()).map_err(|e| {
      std::io::Error::new(e.kind(), format!("failed to bind to '{}': {}", addr, e))
  })?;

Claude Comment:

failed to bind to '127.0.0.1:8125': Address already in use: couldn't create UDP socket
vs the current output which would just be:
Address already in use: couldn't create UDP socket

Non-blocking comment, low change of things breaking, but the socket2 docs do specifically mention they don't do error handling.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too verbose so far, might improve this in another PR with a cleaner approach

BufferReader::UdpSocket(socket)
};

Expand Down Expand Up @@ -267,6 +271,53 @@ impl DogStatsD {
}
}

async fn create_udp_socket(
Comment thread
duncanista marked this conversation as resolved.
addr: &str,
so_rcvbuf: Option<usize>,
) -> std::io::Result<tokio::net::UdpSocket> {
Comment thread
duncanista marked this conversation as resolved.
// Resolve via lookup_host to support hostnames (e.g. "localhost:8125"),
// matching the previous behavior of tokio::net::UdpSocket::bind().
let socket_addr = tokio::net::lookup_host(addr).await?.next().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Could not resolve address '{}'", addr),
)
})?;

let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
Comment thread
Lewis-E marked this conversation as resolved.
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The socket is always created as IPv4 (Domain::IPV4). If addr is an IPv6 SocketAddr, bind will fail (and tokio::net::UdpSocket::bind previously supported IPv6). Choose the socket Domain based on whether the parsed SocketAddr is v4 or v6.

Suggested change
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
let domain = if socket_addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we are always sending localhost addresses, might add in another PR


// Log the kernel's rmem_max cap so operators can tell
// whether the requested SO_RCVBUF was capped by the OS.
#[cfg(target_os = "linux")]
if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") {
debug!("DogStatsD Kernel rmem_max={} bytes", rmem_max.trim());
}
Comment thread
duncanista marked this conversation as resolved.

if let Some(buf_size) = so_rcvbuf {
socket.set_recv_buffer_size(buf_size)?;

// The kernel may cap the value; log what we actually got.
let actual = socket.recv_buffer_size().unwrap_or(0);
Comment thread
duncanista marked this conversation as resolved.
debug!(
"DogStatsD SO_RCVBUF: requested={} bytes, actual={} bytes",
buf_size, actual
);
} else {
debug!(
"DogStatsD using default SO_RCVBUF: {} bytes",
socket.recv_buffer_size().unwrap_or(0)
);
}

// Required for tokio compatibility
socket.set_nonblocking(true)?;

socket.bind(&socket_addr.into())?;

let std_socket: std::net::UdpSocket = socket.into();
tokio::net::UdpSocket::from_std(std_socket)
}

/// Named Pipe server - accepts client connections and forwards metrics.
///
/// Uses a multi-instance approach (like winio in the main agent):
Expand Down Expand Up @@ -474,6 +525,34 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
.starts_with("custom.namespace.my.metric"));
}

#[tokio::test]
async fn test_create_udp_socket_default_so_rcvbuf() {
let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap();
let std_socket = socket.into_std().unwrap();
let s2 = socket2::Socket::from(std_socket);
let buf_size = s2.recv_buffer_size().unwrap();
assert!(buf_size > 0, "default SO_RCVBUF should be non-zero");
}

#[tokio::test]
async fn test_create_udp_socket_custom_so_rcvbuf() {
let requested: usize = 262_144;
let socket = super::create_udp_socket("127.0.0.1:0", Some(requested))
.await
.unwrap();
let std_socket = socket.into_std().unwrap();
let s2 = socket2::Socket::from(std_socket);
let actual = s2.recv_buffer_size().unwrap();
// The kernel may double the value (Linux) or cap it, but it should
// be at least as large as the requested size.
assert!(
actual >= requested,
"SO_RCVBUF actual ({}) should be >= requested ({})",
actual,
requested
);
}

async fn setup_and_consume_dogstatsd(
statsd_string: &str,
metric_namespace: Option<String>,
Expand Down
5 changes: 5 additions & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok
metric_namespace: None,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name: None,
so_rcvbuf: None,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
let dogstatsd_client = DogStatsD::new(
Expand Down Expand Up @@ -308,6 +309,7 @@ async fn test_named_pipe_basic_communication() {
port: 0,
metric_namespace: None,
windows_pipe_name: Some(pipe_name.to_string()),
so_rcvbuf: None,
},
handle,
cancel_token,
Expand Down Expand Up @@ -362,6 +364,7 @@ async fn test_named_pipe_disconnect_reconnect() {
port: 0,
metric_namespace: None,
windows_pipe_name: Some(pipe_name.to_string()),
so_rcvbuf: None,
},
handle,
cancel_token_clone,
Expand Down Expand Up @@ -431,6 +434,7 @@ async fn test_named_pipe_cancellation() {
port: 0,
metric_namespace: None,
windows_pipe_name: Some(pipe_name.to_string()),
so_rcvbuf: None,
},
handle,
cancel_token_clone,
Expand Down Expand Up @@ -474,6 +478,7 @@ async fn test_buffer_split_message() {
port: 0,
metric_namespace: None,
windows_pipe_name: Some(pipe_name.to_string()),
so_rcvbuf: None,
},
handle,
cancel_token_clone,
Expand Down
Loading