Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libdd-library-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rmp-serde = "1.3.0"

[dev-dependencies]
tempfile = { version = "3.3" }
serial_test = "3.2"

[target.'cfg(unix)'.dependencies]
memfd = { version = "0.6" }
Expand Down
136 changes: 108 additions & 28 deletions libdd-library-config/src/otel_process_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Implementation of the publisher part of the [OTEL process context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719)
//! Implementation of the publisher part of the [OTEL process
//! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719)
//!
//! # A note on race conditions
//!
Expand All @@ -16,7 +17,7 @@ pub mod linux {
ffi::c_void,
mem::ManuallyDrop,
os::fd::AsFd as _,
ptr,
ptr::{self, addr_of_mut},
sync::{
atomic::{fence, AtomicU64, Ordering},
Mutex, MutexGuard,
Expand Down Expand Up @@ -220,27 +221,15 @@ pub mod linux {
let mut mapping = MemMapping::new()?;
let size = mapping_size();

// Checks that the layout allow us to access `signature` and `published_at_ns` as
// atomics u64. Page size is at minimum 4KB and will be always 8 bytes aligned even on
// exotic platforms. The respective offsets of `signature` and `published_at_ns` are
// 0 and 8 bytes, so it suffices for `AtomicU64` to require an alignment of at most 8
// (which is the expected alignment anyway).
//
// Note that `align_of` is a `const fn`, so this is in fact a compile-time check and
// will be optimized away, hence the `allow(unreachable_code)`.
#[allow(unreachable_code)]
if std::mem::align_of::<AtomicU64>() > 8 {
return Err(anyhow::anyhow!("alignment constraints forbid the use of atomics for publishing the protocol context"));
}

// Safety: the invariants of MemMapping ensures `start_addr` is not null and comes
// from a previous call to `mmap`
unsafe { madvise(mapping.start_addr, size, Advice::LinuxDontFork) }
.context("madvise MADVISE_DONTFORK failed")?;

let published_at_ns = time_now_ns().ok_or_else(|| {
anyhow::anyhow!("fail to get current time for process context publication")
anyhow::anyhow!("failed to get current time for process context publication")
})?;

let header = mapping.start_addr as *mut MappingHeader;

unsafe {
Expand Down Expand Up @@ -278,12 +267,52 @@ pub mod linux {
Ok(ProcessContextHandle { mapping, payload })
}

/// Updates the context after initial publication. Currently unimplemented (always returns
/// `Err`).
fn update(&mut self, _payload: Vec<u8>) -> anyhow::Result<()> {
Err(anyhow::anyhow!(
"process context update isn't implemented yet"
))
/// Updates the context after initial publication.
fn update(&mut self, payload: Vec<u8>) -> anyhow::Result<()> {
let header = self.mapping.start_addr as *mut MappingHeader;

let published_at_ns = time_now_ns()
.ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?;
let payload_size = payload.len().try_into().map_err(|_| {
anyhow::anyhow!("couldn't update process protocol: new payload too large")
})?;

// Safety
//
// [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes
// aligned even on exotic platforms. The respective offsets of `signature` and
// `published_at_ns` are 0 and 16 bytes, so they are 8-bytes aligned (`AtomicU64` has
// both a size and align of 8 bytes).
//
// The header memory is valid for both read and writes.
let published_at_atomic =
unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).published_at_ns)) };

// A process shouldn't try to concurrently update its own context
//
// Note: be careful of early return while `published_at` is still zero, as this would
// effectively "lock" any future publishing. Move throwing code above this swap, or
// properly restore the previous value if the former can't be done.
if published_at_atomic.swap(0, Ordering::Relaxed) == 0 {
return Err(anyhow::anyhow!(
"concurrent update of the process context is not supported"
));
}

fence(Ordering::SeqCst);
self.payload = payload;
Comment thread
yannham marked this conversation as resolved.

// Safety: we own the mapping, which is live and valid for writes. The header is packed
// and thus has no alignment constraints.
unsafe {
(*header).payload_ptr = self.payload.as_ptr();
(*header).payload_size = payload_size;
}

fence(Ordering::SeqCst);
published_at_atomic.store(published_at_ns, Ordering::Relaxed);

Ok(())
}
}

Expand Down Expand Up @@ -343,6 +372,7 @@ pub mod linux {
}

#[cfg(test)]
#[serial_test::serial]
mod tests {
use super::MappingHeader;
use anyhow::ensure;
Expand Down Expand Up @@ -381,7 +411,7 @@ pub mod linux {
// we found in /proc/self/maps. This should be safe as long as the
// mapping exists and has read permissions.
//
// The atomic alignment constraints are checked during publication.
// For the alignment constraint of `AtomicU64`, see [atomic-u64-alignment].
let signature = unsafe { AtomicU64::from_ptr(ptr).load(Ordering::Relaxed) };
fence(Ordering::SeqCst);
&signature.to_ne_bytes() == super::SIGNATURE
Expand Down Expand Up @@ -426,11 +456,13 @@ pub mod linux {

#[test]
#[cfg_attr(miri, ignore)]
fn publish_then_read_context() {
let payload = "example process context payload";
fn publish_then_update_process_context() {
let payload_v1 = "example process context payload";
let payload_v2 = "another example process context payload of different size";

super::publish(payload.as_bytes().to_vec())
super::publish(payload_v1.as_bytes().to_vec())
.expect("couldn't publish the process context");

let header = read_process_context().expect("couldn't read back the process context");
// Safety: the published context must have put valid bytes of size payload_size in the
// context if the signature check succeded.
Expand All @@ -444,13 +476,61 @@ pub mod linux {
"wrong context version"
);
assert!(
header.payload_size == payload.len() as u32,
header.payload_size == payload_v1.len() as u32,
"wrong payload size"
);
assert!(header.published_at_ns > 0, "published_at_ns is zero");
assert!(read_payload == payload.as_bytes(), "payload mismatch");
assert!(read_payload == payload_v1.as_bytes(), "payload mismatch");

let published_at_ns_v1 = header.published_at_ns;
// Ensure the clock advances so the updated timestamp is strictly greater
std::thread::sleep(std::time::Duration::from_nanos(10));
super::publish(payload_v2.as_bytes().to_vec())
.expect("couldn't update the process context");

let header = read_process_context().expect("couldn't read back the process context");
// Safety: the published context must have put valid bytes of size payload_size in the
// context if the signature check succeded.
let read_payload = unsafe {
std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize)
};

assert!(header.signature == *super::SIGNATURE, "wrong signature");
assert!(
header.version == super::PROCESS_CTX_VERSION,
"wrong context version"
);
assert!(
header.payload_size == payload_v2.len() as u32,
Comment thread
yannham marked this conversation as resolved.
"wrong payload size"
);
assert!(
header.published_at_ns > published_at_ns_v1,
"published_at_ns should be strictly greater after update"
);
assert!(read_payload == payload_v2.as_bytes(), "payload mismatch");

super::unpublish().expect("couldn't unpublish the context");
}

#[test]
#[cfg_attr(miri, ignore)]
fn unpublish_process_context() {
let payload = "example process context payload";

super::publish(payload.as_bytes().to_vec())
.expect("couldn't publish the process context");

// The mapping must be discoverable right after publishing
find_otel_mapping().expect("couldn't find the otel mapping after publishing");

super::unpublish().expect("couldn't unpublish the context");

// After unpublishing the name must no longer appear in /proc/self/maps
assert!(
find_otel_mapping().is_err(),
"otel mapping should not be visible after unpublish"
);
}
}
}
Loading