diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index f02fd7162e..8f4fcddf53 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -25,6 +25,7 @@ tarpc = { path = "./tarpc", default-features = false, features = ["serde-transpo libdd-common = { path = "../libdd-common" } datadog-ipc-macros = { path = "../datadog-ipc-macros" } +tracing = { version = "0.1", default-features = false } [dev-dependencies] criterion = "0.5" diff --git a/datadog-ipc/src/platform/unix/channel/metadata.rs b/datadog-ipc/src/platform/unix/channel/metadata.rs index f5871e2fd7..ce736dda31 100644 --- a/datadog-ipc/src/platform/unix/channel/metadata.rs +++ b/datadog-ipc/src/platform/unix/channel/metadata.rs @@ -4,6 +4,7 @@ use std::{ collections::VecDeque, io, + os::fd::IntoRawFd, os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, }; @@ -17,8 +18,8 @@ use crate::{ #[derive(Debug)] pub struct ChannelMetadata { fds_to_send: Vec>, - fds_received: VecDeque, - pid: libc::pid_t, // must always be set to current Process ID + fds_received: VecDeque, // Store as OwnedFd to prevent leaking them + pid: libc::pid_t, // must always be set to current Process ID } impl Default for ChannelMetadata { @@ -80,7 +81,13 @@ impl ChannelMetadata { } pub(crate) fn receive_fds(&mut self, fds: &[RawFd]) { - self.fds_received.append(&mut fds.to_vec().into()); + self.fds_received.append( + &mut fds + .iter() + .map(|fd| unsafe { OwnedFd::from_raw_fd(*fd) }) + .collect::>() + .into(), + ); } pub(crate) fn find_handle(&mut self, hint: &PlatformHandle) -> Option> { @@ -90,6 +97,6 @@ impl ChannelMetadata { let fd = self.fds_received.pop_front(); - fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd) }) + fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd.into_raw_fd()) }) } } diff --git a/datadog-ipc/src/sequential.rs b/datadog-ipc/src/sequential.rs index 8f776f1935..05bff05a4f 100644 --- a/datadog-ipc/src/sequential.rs +++ b/datadog-ipc/src/sequential.rs @@ -1,21 +1,28 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use futures::{ready, Future, Stream}; +use futures::{Future, Stream}; use std::fmt::Debug; use std::{ pin::Pin, task::{Context, Poll}, }; use tarpc::server::{Channel, InFlightRequest, Requests, Serve}; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::OwnedPermit; #[allow(type_alias_bounds)] type Request = (S, InFlightRequest); +type PendingPermit = Pin< + Box>, SendError<()>>> + Send + 'static>, +>; + /// Replaces tarpc::server::Channel::execute which spawns one task per message with an executor /// that spawns a single worker and queues requests for this task. /// -/// If the queue is full, the requests is dropped and will be cancelled by tarpc. +/// If the queue is full, the request is dropped and will be cancelled by tarpc unless +/// `with_backpressure` is configured for that request type. pub fn execute_sequential( reqs: Requests, serve: S, @@ -43,6 +50,8 @@ where inner: reqs, serve, tx, + backpressure: |_| false, + pending: None, } } @@ -55,6 +64,11 @@ where inner: Requests, serve: S, tx: tokio::sync::mpsc::Sender>, + /// Returns true for requests that must not be dropped when the queue is full. + /// The executor will pause reading new requests and wait for channel space instead. + backpressure: fn(&C::Req) -> bool, + /// Pending channel-space reservation for a backpressure request. + pending: Option<(PendingPermit, Request)>, } impl Future for SequentialExecutor @@ -68,21 +82,50 @@ where type Output = anyhow::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - while let Some(response_handler) = ready!(self.as_mut().project().inner.poll_next(cx)) { - match response_handler { - Ok(resp) => { - let server = self.serve.clone(); - if let Err(_err) = self.as_ref().tx.try_send((server, resp)) { - // TODO: should we log something in case we drop the request on the floor? + loop { + // First flush any pending backpressure send before reading new requests. + { + let this = self.as_mut().project(); + if let Some((fut, _)) = this.pending.as_mut() { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(_)) => return Poll::Ready(Ok(())), // worker dropped + Poll::Ready(Ok(permit)) => { + #[allow(clippy::unwrap_used)] // we've just checked this + let (_, item) = this.pending.take().unwrap(); + permit.send(item); + // fall through to poll next request + } } } - Err(e) => { - // TODO: should we log something in case we drop the request on the floor? - return Poll::Ready(Err(e.into())); + } + + // Read the next request off the transport. + match self.as_mut().project().inner.poll_next(cx) { + Poll::Ready(Some(Ok(resp))) => { + let backpressured = (self.backpressure)(&resp.get().message); + match self.as_ref().tx.try_send((self.serve.clone(), resp)) { + Ok(()) => {} // loop to pick up the next request + Err(err) => { + let (_, resp) = err.into_inner(); + if backpressured { + let fut = Box::pin(self.as_ref().tx.clone().reserve_owned()); + *self.as_mut().project().pending = + Some((fut, (self.serve.clone(), resp))); + } else { + tracing::warn!( + "Dropping {:?}: sequential executor queue is full", + resp.get().message + ); + } + } + } } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e.into())), + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => return Poll::Pending, } } - Poll::Ready(Ok(())) } } @@ -97,4 +140,11 @@ where std::mem::swap(&mut self.tx, &mut sender); sender } + + /// Configures a predicate that identifies requests which must not be dropped when the queue + /// is full. For those requests the executor will pause reading and wait for channel space. + pub fn with_backpressure(mut self, backpressure: fn(&C::Req) -> bool) -> Self { + self.backpressure = backpressure; + self + } } diff --git a/datadog-sidecar-macros/src/lib.rs b/datadog-sidecar-macros/src/lib.rs index 9e5ff65998..80bc00480e 100644 --- a/datadog-sidecar-macros/src/lib.rs +++ b/datadog-sidecar-macros/src/lib.rs @@ -28,25 +28,46 @@ fn snake_to_camel(ident_str: &str) -> String { } #[proc_macro_attribute] -pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenStream { - let item: ItemTrait = syn::parse(input.clone()).unwrap(); +pub fn extract_request_id(_attr: TokenStream, input: TokenStream) -> TokenStream { + let mut item: ItemTrait = syn::parse(input).unwrap(); let name = &format_ident!("{}Request", item.ident); let mut arms: Vec = vec![]; - for inner in item.items { + let mut backpressure_variants: Vec = vec![]; + + for inner in item.items.iter_mut() { if let TraitItem::Fn(func) = inner { - for any_arg in func.sig.inputs { + // Strip #[force_backpressure] and record which methods carry it. + let had_force_backpressure = func.attrs.iter().any(|attr| { + attr.meta + .path() + .get_ident() + .is_some_and(|i| i == "force_backpressure") + }); + func.attrs.retain(|attr| { + attr.meta + .path() + .get_ident() + .is_none_or(|i| i != "force_backpressure") + }); + + let method = Ident::new( + &snake_to_camel(&func.sig.ident.to_string()), + Span::mixed_site(), + ); + + if had_force_backpressure { + backpressure_variants.push(method.clone()); + } + + for any_arg in &func.sig.inputs { if let Typed(arg) = any_arg { - if let Pat::Ident(ident) = *arg.pat { + if let Pat::Ident(ident) = &*arg.pat { let matched_enum_type = match ident.ident.to_string().as_str() { "session_id" => Some(format_ident!("SessionId")), "instance_id" => Some(format_ident!("InstanceId")), _ => None, }; if let Some(enum_type) = matched_enum_type { - let method = Ident::new( - &snake_to_camel(&func.sig.ident.to_string()), - Span::mixed_site(), - ); arms.push(parse_quote! { #name::#method { #ident, .. } => RequestIdentifier::#enum_type(#ident.clone()) }); @@ -56,7 +77,16 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt } } } - input.extend(TokenStream::from(quote! { + + let backpressure_body = if backpressure_variants.is_empty() { + quote! { false } + } else { + quote! { matches!(self, #(#name::#backpressure_variants { .. })|*) } + }; + + TokenStream::from(quote! { + #item + impl RequestIdentification for tarpc::Request<#name> { fn extract_identifier(&self) -> RequestIdentifier { match &self.message { @@ -67,8 +97,14 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt } } } - })); - input + + impl #name { + /// Returns true if this request variant was annotated with `#[force_backpressure]`. + pub fn requires_backpressure(&self) -> bool { + #backpressure_body + } + } + }) } struct EnvOrDefault { diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 307152b087..bfcdfe05ca 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -60,6 +60,7 @@ pub trait SidecarInterface { /// * `session_id` - The ID of the session. /// * `pid` - The pid of the sidecar client. /// * `config` - The configuration to be set. + #[force_backpressure] async fn set_session_config( session_id: String, remote_config_notify_target: RemoteConfigNotifyTarget, @@ -73,6 +74,7 @@ pub trait SidecarInterface { /// /// * `session_id` - The ID of the session. /// * `process_tags` - The process tags string. + #[force_backpressure] async fn set_session_process_tags(session_id: String, process_tags: String); /// Shuts down a runtime. @@ -80,6 +82,7 @@ pub trait SidecarInterface { /// # Arguments /// /// * `instance_id` - The ID of the instance. + #[force_backpressure] async fn shutdown_runtime(instance_id: InstanceId); /// Shuts down a session. @@ -87,6 +90,7 @@ pub trait SidecarInterface { /// # Arguments /// /// * `session_id` - The ID of the session. + #[force_backpressure] async fn shutdown_session(session_id: String); /// Sends a trace via shared memory. @@ -165,6 +169,7 @@ pub trait SidecarInterface { /// * `global_tags` - Global tags which need to be propagated. /// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or /// not set. + #[force_backpressure] async fn set_universal_service_tags( instance_id: InstanceId, queue_id: QueueId, @@ -182,6 +187,7 @@ pub trait SidecarInterface { /// * `queue_id` - The unique identifier for the trace context. /// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or /// not set. + #[force_backpressure] async fn set_request_config( instance_id: InstanceId, queue_id: QueueId, @@ -197,6 +203,7 @@ pub trait SidecarInterface { async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); /// Flushes any outstanding traces queued for sending. + #[force_backpressure] async fn flush_traces(); /// Sets x-datadog-test-session-token on all requests for the given session. @@ -208,6 +215,7 @@ pub trait SidecarInterface { async fn set_test_session_token(session_id: String, token: String); /// Sends a ping to the service. + #[force_backpressure] async fn ping(); /// Dumps the current state of the service. @@ -215,6 +223,7 @@ pub trait SidecarInterface { /// # Returns /// /// A string representation of the current state of the service. + #[force_backpressure] async fn dump() -> String; /// Retrieves the current statistics of the service. @@ -222,5 +231,6 @@ pub trait SidecarInterface { /// # Returns /// /// A string representation of the current statistics of the service. + #[force_backpressure] async fn stats() -> String; } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 45afb66179..e7a9875bca 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -152,7 +152,8 @@ impl SidecarServer { server.requests(), self.clone().serve(), 500, - ); + ) + .with_backpressure(SidecarInterfaceRequest::requires_backpressure); let (tx, rx) = tokio::sync::mpsc::channel::<_>(100); let tx = executor.swap_sender(tx);