Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tarpc = { path = "./tarpc", default-features = false, features = ["serde-transpo

libdd-common = { path = "../libdd-common" }
datadog-ipc-macros = { path = "../datadog-ipc-macros" }
tracing = { version = "0.1", default-features = false }

[dev-dependencies]
criterion = "0.5"
Expand Down
15 changes: 11 additions & 4 deletions datadog-ipc/src/platform/unix/channel/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
collections::VecDeque,
io,
os::fd::IntoRawFd,
os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
};

Expand All @@ -17,8 +18,8 @@ use crate::{
#[derive(Debug)]
pub struct ChannelMetadata {
fds_to_send: Vec<PlatformHandle<OwnedFd>>,
fds_received: VecDeque<RawFd>,
pid: libc::pid_t, // must always be set to current Process ID
fds_received: VecDeque<OwnedFd>, // Store as OwnedFd to prevent leaking them
pid: libc::pid_t, // must always be set to current Process ID
}

impl Default for ChannelMetadata {
Expand Down Expand Up @@ -80,7 +81,13 @@ impl ChannelMetadata {
}

pub(crate) fn receive_fds(&mut self, fds: &[RawFd]) {
self.fds_received.append(&mut fds.to_vec().into());
self.fds_received.append(
&mut fds
.iter()
.map(|fd| unsafe { OwnedFd::from_raw_fd(*fd) })
.collect::<Vec<_>>()
.into(),
);
}

pub(crate) fn find_handle<T>(&mut self, hint: &PlatformHandle<T>) -> Option<PlatformHandle<T>> {
Expand All @@ -90,6 +97,6 @@ impl ChannelMetadata {

let fd = self.fds_received.pop_front();

fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd) })
fd.map(|fd| unsafe { PlatformHandle::from_raw_fd(fd.into_raw_fd()) })
}
}
74 changes: 62 additions & 12 deletions datadog-ipc/src/sequential.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use futures::{ready, Future, Stream};
use futures::{Future, Stream};
use std::fmt::Debug;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tarpc::server::{Channel, InFlightRequest, Requests, Serve};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::OwnedPermit;

#[allow(type_alias_bounds)]
type Request<S, C: Channel> = (S, InFlightRequest<C::Req, C::Resp>);

type PendingPermit<S, C> = Pin<
Box<dyn Future<Output = Result<OwnedPermit<Request<S, C>>, SendError<()>>> + Send + 'static>,
>;

/// Replaces tarpc::server::Channel::execute which spawns one task per message with an executor
/// that spawns a single worker and queues requests for this task.
///
/// If the queue is full, the requests is dropped and will be cancelled by tarpc.
/// If the queue is full, the request is dropped and will be cancelled by tarpc unless
/// `with_backpressure` is configured for that request type.
pub fn execute_sequential<C, S>(
reqs: Requests<C>,
serve: S,
Expand Down Expand Up @@ -43,6 +50,8 @@ where
inner: reqs,
serve,
tx,
backpressure: |_| false,
pending: None,
}
}

Expand All @@ -55,6 +64,11 @@ where
inner: Requests<C>,
serve: S,
tx: tokio::sync::mpsc::Sender<Request<S, C>>,
/// Returns true for requests that must not be dropped when the queue is full.
/// The executor will pause reading new requests and wait for channel space instead.
backpressure: fn(&C::Req) -> bool,
/// Pending channel-space reservation for a backpressure request.
pending: Option<(PendingPermit<S, C>, Request<S, C>)>,
}

impl<C, S> Future for SequentialExecutor<C, S>
Expand All @@ -68,21 +82,50 @@ where
type Output = anyhow::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
while let Some(response_handler) = ready!(self.as_mut().project().inner.poll_next(cx)) {
match response_handler {
Ok(resp) => {
let server = self.serve.clone();
if let Err(_err) = self.as_ref().tx.try_send((server, resp)) {
// TODO: should we log something in case we drop the request on the floor?
loop {
// First flush any pending backpressure send before reading new requests.
{
let this = self.as_mut().project();
if let Some((fut, _)) = this.pending.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => return Poll::Ready(Ok(())), // worker dropped
Poll::Ready(Ok(permit)) => {
#[allow(clippy::unwrap_used)] // we've just checked this
let (_, item) = this.pending.take().unwrap();
permit.send(item);
// fall through to poll next request
}
}
}
Err(e) => {
// TODO: should we log something in case we drop the request on the floor?
return Poll::Ready(Err(e.into()));
}

// Read the next request off the transport.
match self.as_mut().project().inner.poll_next(cx) {
Poll::Ready(Some(Ok(resp))) => {
let backpressured = (self.backpressure)(&resp.get().message);
match self.as_ref().tx.try_send((self.serve.clone(), resp)) {
Ok(()) => {} // loop to pick up the next request
Err(err) => {
let (_, resp) = err.into_inner();
if backpressured {
let fut = Box::pin(self.as_ref().tx.clone().reserve_owned());
*self.as_mut().project().pending =
Some((fut, (self.serve.clone(), resp)));
} else {
tracing::warn!(
"Dropping {:?}: sequential executor queue is full",
resp.get().message
);
}
}
}
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e.into())),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(()))
}
}

Expand All @@ -97,4 +140,11 @@ where
std::mem::swap(&mut self.tx, &mut sender);
sender
}

/// Configures a predicate that identifies requests which must not be dropped when the queue
/// is full. For those requests the executor will pause reading and wait for channel space.
pub fn with_backpressure(mut self, backpressure: fn(&C::Req) -> bool) -> Self {
self.backpressure = backpressure;
self
}
}
60 changes: 48 additions & 12 deletions datadog-sidecar-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,46 @@ fn snake_to_camel(ident_str: &str) -> String {
}

#[proc_macro_attribute]
pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let item: ItemTrait = syn::parse(input.clone()).unwrap();
pub fn extract_request_id(_attr: TokenStream, input: TokenStream) -> TokenStream {
let mut item: ItemTrait = syn::parse(input).unwrap();
let name = &format_ident!("{}Request", item.ident);
let mut arms: Vec<Arm> = vec![];
for inner in item.items {
let mut backpressure_variants: Vec<Ident> = vec![];

for inner in item.items.iter_mut() {
if let TraitItem::Fn(func) = inner {
for any_arg in func.sig.inputs {
// Strip #[force_backpressure] and record which methods carry it.
let had_force_backpressure = func.attrs.iter().any(|attr| {
attr.meta
.path()
.get_ident()
.is_some_and(|i| i == "force_backpressure")
});
func.attrs.retain(|attr| {
attr.meta
.path()
.get_ident()
.is_none_or(|i| i != "force_backpressure")
});

let method = Ident::new(
&snake_to_camel(&func.sig.ident.to_string()),
Span::mixed_site(),
);

if had_force_backpressure {
backpressure_variants.push(method.clone());
}

for any_arg in &func.sig.inputs {
if let Typed(arg) = any_arg {
if let Pat::Ident(ident) = *arg.pat {
if let Pat::Ident(ident) = &*arg.pat {
let matched_enum_type = match ident.ident.to_string().as_str() {
"session_id" => Some(format_ident!("SessionId")),
"instance_id" => Some(format_ident!("InstanceId")),
_ => None,
};
if let Some(enum_type) = matched_enum_type {
let method = Ident::new(
&snake_to_camel(&func.sig.ident.to_string()),
Span::mixed_site(),
);
arms.push(parse_quote! {
#name::#method { #ident, .. } => RequestIdentifier::#enum_type(#ident.clone())
});
Expand All @@ -56,7 +77,16 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt
}
}
}
input.extend(TokenStream::from(quote! {

let backpressure_body = if backpressure_variants.is_empty() {
quote! { false }
} else {
quote! { matches!(self, #(#name::#backpressure_variants { .. })|*) }
};

TokenStream::from(quote! {
#item

impl RequestIdentification for tarpc::Request<#name> {
fn extract_identifier(&self) -> RequestIdentifier {
match &self.message {
Expand All @@ -67,8 +97,14 @@ pub fn extract_request_id(_attr: TokenStream, mut input: TokenStream) -> TokenSt
}
}
}
}));
input

impl #name {
/// Returns true if this request variant was annotated with `#[force_backpressure]`.
pub fn requires_backpressure(&self) -> bool {
#backpressure_body
}
}
})
}

struct EnvOrDefault {
Expand Down
10 changes: 10 additions & 0 deletions datadog-sidecar/src/service/sidecar_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub trait SidecarInterface {
/// * `session_id` - The ID of the session.
/// * `pid` - The pid of the sidecar client.
/// * `config` - The configuration to be set.
#[force_backpressure]
async fn set_session_config(
session_id: String,
remote_config_notify_target: RemoteConfigNotifyTarget,
Expand All @@ -73,20 +74,23 @@ pub trait SidecarInterface {
///
/// * `session_id` - The ID of the session.
/// * `process_tags` - The process tags string.
#[force_backpressure]
async fn set_session_process_tags(session_id: String, process_tags: String);

/// Shuts down a runtime.
///
/// # Arguments
///
/// * `instance_id` - The ID of the instance.
#[force_backpressure]
async fn shutdown_runtime(instance_id: InstanceId);

/// Shuts down a session.
///
/// # Arguments
///
/// * `session_id` - The ID of the session.
#[force_backpressure]
async fn shutdown_session(session_id: String);

/// Sends a trace via shared memory.
Expand Down Expand Up @@ -165,6 +169,7 @@ pub trait SidecarInterface {
/// * `global_tags` - Global tags which need to be propagated.
/// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or
/// not set.
#[force_backpressure]
async fn set_universal_service_tags(
instance_id: InstanceId,
queue_id: QueueId,
Expand All @@ -182,6 +187,7 @@ pub trait SidecarInterface {
/// * `queue_id` - The unique identifier for the trace context.
/// * `dynamic_instrumentation_state` - Whether dynamic instrumentation is enabled, disabled or
/// not set.
#[force_backpressure]
async fn set_request_config(
instance_id: InstanceId,
queue_id: QueueId,
Expand All @@ -197,6 +203,7 @@ pub trait SidecarInterface {
async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec<DogStatsDActionOwned>);

/// Flushes any outstanding traces queued for sending.
#[force_backpressure]
async fn flush_traces();

/// Sets x-datadog-test-session-token on all requests for the given session.
Expand All @@ -208,19 +215,22 @@ pub trait SidecarInterface {
async fn set_test_session_token(session_id: String, token: String);

/// Sends a ping to the service.
#[force_backpressure]
async fn ping();

/// Dumps the current state of the service.
///
/// # Returns
///
/// A string representation of the current state of the service.
#[force_backpressure]
async fn dump() -> String;

/// Retrieves the current statistics of the service.
///
/// # Returns
///
/// A string representation of the current statistics of the service.
#[force_backpressure]
async fn stats() -> String;
}
3 changes: 2 additions & 1 deletion datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ impl SidecarServer {
server.requests(),
self.clone().serve(),
500,
);
)
.with_backpressure(SidecarInterfaceRequest::requires_backpressure);
let (tx, rx) = tokio::sync::mpsc::channel::<_>(100);
let tx = executor.swap_sender(tx);

Expand Down
Loading