diff --git a/Cargo.lock b/Cargo.lock index f5e51d432b..519b6b857f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3170,6 +3170,7 @@ dependencies = [ "rustix 1.1.3", "serde", "serde_yaml", + "serial_test", "tempfile", ] diff --git a/libdd-library-config/Cargo.toml b/libdd-library-config/Cargo.toml index 2af7f2f373..d8a171e875 100644 --- a/libdd-library-config/Cargo.toml +++ b/libdd-library-config/Cargo.toml @@ -25,6 +25,7 @@ rmp-serde = "1.3.0" [dev-dependencies] tempfile = { version = "3.3" } +serial_test = "3.2" [target.'cfg(unix)'.dependencies] memfd = { version = "0.6" } diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index 9a7489c4e1..636b92837d 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -1,7 +1,8 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Implementation of the publisher part of the [OTEL process context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) +//! Implementation of the publisher part of the [OTEL process +//! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) //! //! # A note on race conditions //! @@ -16,7 +17,7 @@ pub mod linux { ffi::c_void, mem::ManuallyDrop, os::fd::AsFd as _, - ptr, + ptr::{self, addr_of_mut}, sync::{ atomic::{fence, AtomicU64, Ordering}, Mutex, MutexGuard, @@ -220,27 +221,15 @@ pub mod linux { let mut mapping = MemMapping::new()?; let size = mapping_size(); - // Checks that the layout allow us to access `signature` and `published_at_ns` as - // atomics u64. Page size is at minimum 4KB and will be always 8 bytes aligned even on - // exotic platforms. The respective offsets of `signature` and `published_at_ns` are - // 0 and 8 bytes, so it suffices for `AtomicU64` to require an alignment of at most 8 - // (which is the expected alignment anyway). - // - // Note that `align_of` is a `const fn`, so this is in fact a compile-time check and - // will be optimized away, hence the `allow(unreachable_code)`. - #[allow(unreachable_code)] - if std::mem::align_of::() > 8 { - return Err(anyhow::anyhow!("alignment constraints forbid the use of atomics for publishing the protocol context")); - } - // Safety: the invariants of MemMapping ensures `start_addr` is not null and comes // from a previous call to `mmap` unsafe { madvise(mapping.start_addr, size, Advice::LinuxDontFork) } .context("madvise MADVISE_DONTFORK failed")?; let published_at_ns = time_now_ns().ok_or_else(|| { - anyhow::anyhow!("fail to get current time for process context publication") + anyhow::anyhow!("failed to get current time for process context publication") })?; + let header = mapping.start_addr as *mut MappingHeader; unsafe { @@ -278,12 +267,52 @@ pub mod linux { Ok(ProcessContextHandle { mapping, payload }) } - /// Updates the context after initial publication. Currently unimplemented (always returns - /// `Err`). - fn update(&mut self, _payload: Vec) -> anyhow::Result<()> { - Err(anyhow::anyhow!( - "process context update isn't implemented yet" - )) + /// Updates the context after initial publication. + fn update(&mut self, payload: Vec) -> anyhow::Result<()> { + let header = self.mapping.start_addr as *mut MappingHeader; + + let published_at_ns = time_now_ns() + .ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?; + let payload_size = payload.len().try_into().map_err(|_| { + anyhow::anyhow!("couldn't update process protocol: new payload too large") + })?; + + // Safety + // + // [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes + // aligned even on exotic platforms. The respective offsets of `signature` and + // `published_at_ns` are 0 and 16 bytes, so they are 8-bytes aligned (`AtomicU64` has + // both a size and align of 8 bytes). + // + // The header memory is valid for both read and writes. + let published_at_atomic = + unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).published_at_ns)) }; + + // A process shouldn't try to concurrently update its own context + // + // Note: be careful of early return while `published_at` is still zero, as this would + // effectively "lock" any future publishing. Move throwing code above this swap, or + // properly restore the previous value if the former can't be done. + if published_at_atomic.swap(0, Ordering::Relaxed) == 0 { + return Err(anyhow::anyhow!( + "concurrent update of the process context is not supported" + )); + } + + fence(Ordering::SeqCst); + self.payload = payload; + + // Safety: we own the mapping, which is live and valid for writes. The header is packed + // and thus has no alignment constraints. + unsafe { + (*header).payload_ptr = self.payload.as_ptr(); + (*header).payload_size = payload_size; + } + + fence(Ordering::SeqCst); + published_at_atomic.store(published_at_ns, Ordering::Relaxed); + + Ok(()) } } @@ -343,6 +372,7 @@ pub mod linux { } #[cfg(test)] + #[serial_test::serial] mod tests { use super::MappingHeader; use anyhow::ensure; @@ -381,7 +411,7 @@ pub mod linux { // we found in /proc/self/maps. This should be safe as long as the // mapping exists and has read permissions. // - // The atomic alignment constraints are checked during publication. + // For the alignment constraint of `AtomicU64`, see [atomic-u64-alignment]. let signature = unsafe { AtomicU64::from_ptr(ptr).load(Ordering::Relaxed) }; fence(Ordering::SeqCst); &signature.to_ne_bytes() == super::SIGNATURE @@ -426,11 +456,13 @@ pub mod linux { #[test] #[cfg_attr(miri, ignore)] - fn publish_then_read_context() { - let payload = "example process context payload"; + fn publish_then_update_process_context() { + let payload_v1 = "example process context payload"; + let payload_v2 = "another example process context payload of different size"; - super::publish(payload.as_bytes().to_vec()) + super::publish(payload_v1.as_bytes().to_vec()) .expect("couldn't publish the process context"); + let header = read_process_context().expect("couldn't read back the process context"); // Safety: the published context must have put valid bytes of size payload_size in the // context if the signature check succeded. @@ -444,13 +476,61 @@ pub mod linux { "wrong context version" ); assert!( - header.payload_size == payload.len() as u32, + header.payload_size == payload_v1.len() as u32, "wrong payload size" ); assert!(header.published_at_ns > 0, "published_at_ns is zero"); - assert!(read_payload == payload.as_bytes(), "payload mismatch"); + assert!(read_payload == payload_v1.as_bytes(), "payload mismatch"); + + let published_at_ns_v1 = header.published_at_ns; + // Ensure the clock advances so the updated timestamp is strictly greater + std::thread::sleep(std::time::Duration::from_nanos(10)); + super::publish(payload_v2.as_bytes().to_vec()) + .expect("couldn't update the process context"); + + let header = read_process_context().expect("couldn't read back the process context"); + // Safety: the published context must have put valid bytes of size payload_size in the + // context if the signature check succeded. + let read_payload = unsafe { + std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize) + }; + + assert!(header.signature == *super::SIGNATURE, "wrong signature"); + assert!( + header.version == super::PROCESS_CTX_VERSION, + "wrong context version" + ); + assert!( + header.payload_size == payload_v2.len() as u32, + "wrong payload size" + ); + assert!( + header.published_at_ns > published_at_ns_v1, + "published_at_ns should be strictly greater after update" + ); + assert!(read_payload == payload_v2.as_bytes(), "payload mismatch"); super::unpublish().expect("couldn't unpublish the context"); } + + #[test] + #[cfg_attr(miri, ignore)] + fn unpublish_process_context() { + let payload = "example process context payload"; + + super::publish(payload.as_bytes().to_vec()) + .expect("couldn't publish the process context"); + + // The mapping must be discoverable right after publishing + find_otel_mapping().expect("couldn't find the otel mapping after publishing"); + + super::unpublish().expect("couldn't unpublish the context"); + + // After unpublishing the name must no longer appear in /proc/self/maps + assert!( + find_otel_mapping().is_err(), + "otel mapping should not be visible after unpublish" + ); + } } }