From 90b3231b8eee0dc74c0b5989e752eb60c8206c85 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Fri, 6 Mar 2026 16:46:56 +0100 Subject: [PATCH] Handle backpressure more gracefully Only drop some operations on the floor, but never those setting the state of the connection. Otherwise, under load the state may end up in unexpected state. Also do not leak file descriptors on partially received messages. Signed-off-by: Bob Weinand --- datadog-ipc/Cargo.toml | 1 + .../src/platform/unix/channel/metadata.rs | 15 +++- datadog-ipc/src/sequential.rs | 74 ++++++++++++++++--- datadog-sidecar-macros/src/lib.rs | 60 ++++++++++++--- .../src/service/sidecar_interface.rs | 10 +++ datadog-sidecar/src/service/sidecar_server.rs | 3 +- 6 files changed, 134 insertions(+), 29 deletions(-) diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index f02fd7162e..8f4fcddf53 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -25,6 +25,7 @@ tarpc = { path = "./tarpc", default-features = false, features = ["serde-transpo libdd-common = { path = "../libdd-common" } datadog-ipc-macros = { path = "../datadog-ipc-macros" } +tracing = { version = "0.1", default-features = false } [dev-dependencies] criterion = "0.5" diff --git a/datadog-ipc/src/platform/unix/channel/metadata.rs b/datadog-ipc/src/platform/unix/channel/metadata.rs index f5871e2fd7..ce736dda31 100644 --- a/datadog-ipc/src/platform/unix/channel/metadata.rs +++ b/datadog-ipc/src/platform/unix/channel/metadata.rs @@ -4,6 +4,7 @@ use std::{ collections::VecDeque, io, + os::fd::IntoRawFd, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; @@ -17,8 +18,8 @@ use crate::{ #[derive(Debug)] pub struct ChannelMetadata { fds_to_send: Vec>, - fds_received: VecDeque, - pid: libc::pid_t, // must always be set to current Process ID + fds_received: VecDeque, // Store as OwnedFd to prevent leaking them + pid: libc::pid_t, // must always be set to current Process ID } impl Default for ChannelMetadata { @@ -80,7 +81,13 @@ impl ChannelMetadata { } pub(crate) fn receive_fds(&mut self, fds: &[RawFd]) { - self.fds_received.append(&mut fds.to_vec().into()); + self.fds_received.append( + &mut fds + .iter() + .map(|fd| unsafe { OwnedFd::from_raw_fd(*fd) }) + .collect::>() + .into(), + ); } pub(crate) fn find_handle(&mut self, hint: &PlatformHandle) -> Option> { @@ -90,6 +97,6 @@ impl ChannelMetadata { let fd = self.fds_received.pop_front(); - fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd) }) + fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd.into_raw_fd()) }) } } diff --git a/datadog-ipc/src/sequential.rs b/datadog-ipc/src/sequential.rs index 8f776f1935..05bff05a4f 100644 --- a/datadog-ipc/src/sequential.rs +++ b/datadog-ipc/src/sequential.rs @@ -1,21 +1,28 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use futures::{ready, Future, Stream}; +use futures::{Future, Stream}; use std::fmt::Debug; use std::{ pin::Pin, task::{Context, Poll}, }; use tarpc::server::{Channel, InFlightRequest, Requests, Serve}; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::OwnedPermit; #[allow(type_alias_bounds)] type Request = (S, InFlightRequest); +type PendingPermit = Pin< + Box>, SendError<()>>> + Send + 'static>, +>; + /// Replaces tarpc::server::Channel::execute which spawns one task per message with an executor /// that spawns a single worker and queues requests for this task. /// -/// If the queue is full, the requests is dropped and will be cancelled by tarpc. +/// If the queue is full, the request is dropped and will be cancelled by tarpc unless +/// `with_backpressure` is configured for that request type. pub fn execute_sequential( reqs: Requests, serve: S, @@ -43,6 +50,8 @@ where inner: reqs, serve, tx, + backpressure: |_| false, + pending: None, } } @@ -55,6 +64,11 @@ where inner: Requests, serve: S, tx: tokio::sync::mpsc::Sender>, + /// Returns true for requests that must not be dropped when the queue is full. + /// The executor will pause reading new requests and wait for channel space instead. + backpressure: fn(&C::Req) -> bool, + /// Pending channel-space reservation for a backpressure request. + pending: Option<(PendingPermit, Request)>, } impl Future for SequentialExecutor @@ -68,21 +82,50 @@ where type Output = anyhow::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - while let Some(response_handler) = ready!(self.as_mut().project().inner.poll_next(cx)) { - match response_handler { - Ok(resp) => { - let server = self.serve.clone(); - if let Err(_err) = self.as_ref().tx.try_send((server, resp)) { - // TODO: should we log something in case we drop the request on the floor? + loop { + // First flush any pending backpressure send before reading new requests. + { + let this = self.as_mut().project(); + if let Some((fut, _)) = this.pending.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(_)) => return Poll::Ready(Ok(())), // worker dropped + Poll::Ready(Ok(permit)) => { + #[allow(clippy::unwrap_used)] // we've just checked this + let (_, item) = this.pending.take().unwrap(); + permit.send(item); + // fall through to poll next request + } } } - Err(e) => { - // TODO: should we log something in case we drop the request on the floor? - return Poll::Ready(Err(e.into())); + } + + // Read the next request off the transport. + match self.as_mut().project().inner.poll_next(cx) { + Poll::Ready(Some(Ok(resp))) => { + let backpressured = (self.backpressure)(&resp.get().message); + match self.as_ref().tx.try_send((self.serve.clone(), resp)) { + Ok(()) => {} // loop to pick up the next request + Err(err) => { + let (_, resp) = err.into_inner(); + if backpressured { + let fut = Box::pin(self.as_ref().tx.clone().reserve_owned()); + *self.as_mut().project().pending = + Some((fut, (self.serve.clone(), resp))); + } else { + tracing::warn!( + "Dropping {:?}: sequential executor queue is full", + resp.get().message + ); + } + } + } } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e.into())), + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => return Poll::Pending, } } - Poll::Ready(Ok(())) } } @@ -97,4 +140,11 @@ where std::mem::swap(&mut self.tx, &mut sender); sender } + + /// Configures a predicate that identifies requests which must not be dropped when the queue + /// is full. For those requests the executor will pause reading and wait for channel space. + pub fn with_backpressure(mut self, backpressure: fn(&C::Req) -> bool) -> Self { + self.backpressure = backpressure; + self + } } diff --git a/datadog-sidecar-macros/src/lib.rs b/datadog-sidecar-macros/src/lib.rs index 9e5ff65998..80bc00480e 100644 --- a/datadog-sidecar-macros/src/lib.rs +++ b/datadog-sidecar-macros/src/lib.rs @@ -28,25 +28,46 @@ fn snake_to_camel(ident_str: &str) -> String { } #[proc_macro_attribute] -pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenStream { - let item: ItemTrait = syn::parse(input.clone()).unwrap(); +pub fn extract_request_id(_attr: TokenStream, input: TokenStream) -> TokenStream { + let mut item: ItemTrait = syn::parse(input).unwrap(); let name = &format_ident!("{}Request", item.ident); let mut arms: Vec = vec![]; - for inner in item.items { + let mut backpressure_variants: Vec = vec![]; + + for inner in item.items.iter_mut() { if let TraitItem::Fn(func) = inner { - for any_arg in func.sig.inputs { + // Strip #[force_backpressure] and record which methods carry it. + let had_force_backpressure = func.attrs.iter().any(|attr| { + attr.meta + .path() + .get_ident() + .is_some_and(|i| i == "force_backpressure") + }); + func.attrs.retain(|attr| { + attr.meta + .path() + .get_ident() + .is_none_or(|i| i != "force_backpressure") + }); + + let method = Ident::new( + &snake_to_camel(&func.sig.ident.to_string()), + Span::mixed_site(), + ); + + if had_force_backpressure { + backpressure_variants.push(method.clone()); + } + + for any_arg in &func.sig.inputs { if let Typed(arg) = any_arg { - if let Pat::Ident(ident) = *arg.pat { + if let Pat::Ident(ident) = &*arg.pat { let matched_enum_type = match ident.ident.to_string().as_str() { "session_id" => Some(format_ident!("SessionId")), "instance_id" => Some(format_ident!("InstanceId")), _ => None, }; if let Some(enum_type) = matched_enum_type { - let method = Ident::new( - &snake_to_camel(&func.sig.ident.to_string()), - Span::mixed_site(), - ); arms.push(parse_quote! { #name::#method { #ident, .. } => RequestIdentifier::#enum_type(#ident.clone()) }); @@ -56,7 +77,16 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt } } } - input.extend(TokenStream::from(quote! { + + let backpressure_body = if backpressure_variants.is_empty() { + quote! { false } + } else { + quote! { matches!(self, #(#name::#backpressure_variants { .. })|*) } + }; + + TokenStream::from(quote! { + #item + impl RequestIdentification for tarpc::Request<#name> { fn extract_identifier(&self) -> RequestIdentifier { match &self.message { @@ -67,8 +97,14 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt } } } - })); - input + + impl #name { + /// Returns true if this request variant was annotated with `#[force_backpressure]`. + pub fn requires_backpressure(&self) -> bool { + #backpressure_body + } + } + }) } struct EnvOrDefault { diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 307152b087..bfcdfe05ca 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -60,6 +60,7 @@ pub trait SidecarInterface { /// * `session_id` - The ID of the session. /// * `pid` - The pid of the sidecar client. /// * `config` - The configuration to be set. + #[force_backpressure] async fn set_session_config( session_id: String, remote_config_notify_target: RemoteConfigNotifyTarget, @@ -73,6 +74,7 @@ pub trait SidecarInterface { /// /// * `session_id` - The ID of the session. /// * `process_tags` - The process tags string. + #[force_backpressure] async fn set_session_process_tags(session_id: String, process_tags: String); /// Shuts down a runtime. @@ -80,6 +82,7 @@ pub trait SidecarInterface { /// # Arguments /// /// * `instance_id` - The ID of the instance. + #[force_backpressure] async fn shutdown_runtime(instance_id: InstanceId); /// Shuts down a session. @@ -87,6 +90,7 @@ pub trait SidecarInterface { /// # Arguments /// /// * `session_id` - The ID of the session. + #[force_backpressure] async fn shutdown_session(session_id: String); /// Sends a trace via shared memory. @@ -165,6 +169,7 @@ pub trait SidecarInterface { /// * `global_tags` - Global tags which need to be propagated. /// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or /// not set. + #[force_backpressure] async fn set_universal_service_tags( instance_id: InstanceId, queue_id: QueueId, @@ -182,6 +187,7 @@ pub trait SidecarInterface { /// * `queue_id` - The unique identifier for the trace context. /// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or /// not set. + #[force_backpressure] async fn set_request_config( instance_id: InstanceId, queue_id: QueueId, @@ -197,6 +203,7 @@ pub trait SidecarInterface { async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); /// Flushes any outstanding traces queued for sending. + #[force_backpressure] async fn flush_traces(); /// Sets x-datadog-test-session-token on all requests for the given session. @@ -208,6 +215,7 @@ pub trait SidecarInterface { async fn set_test_session_token(session_id: String, token: String); /// Sends a ping to the service. + #[force_backpressure] async fn ping(); /// Dumps the current state of the service. @@ -215,6 +223,7 @@ pub trait SidecarInterface { /// # Returns /// /// A string representation of the current state of the service. + #[force_backpressure] async fn dump() -> String; /// Retrieves the current statistics of the service. @@ -222,5 +231,6 @@ pub trait SidecarInterface { /// # Returns /// /// A string representation of the current statistics of the service. + #[force_backpressure] async fn stats() -> String; } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 45afb66179..e7a9875bca 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -152,7 +152,8 @@ impl SidecarServer { server.requests(), self.clone().serve(), 500, - ); + ) + .with_backpressure(SidecarInterfaceRequest::requires_backpressure); let (tx, rx) = tokio::sync::mpsc::channel::<_>(100); let tx = executor.swap_sender(tx);