Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions libdd-library-config/src/otel_process_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod linux {
use rustix::{
fs::{ftruncate, memfd_create, MemfdFlags},
mm::{madvise, mmap, mmap_anonymous, munmap, Advice, MapFlags, ProtFlags},
process::set_virtual_memory_region_name,
process::{getpid, set_virtual_memory_region_name, Pid},
};

/// Current version of the process context format
Expand Down Expand Up @@ -213,6 +213,9 @@ pub mod linux {
/// or drop).
#[allow(unused)]
payload: Vec<u8>,
/// The process id of the last publisher. This is useful to detect forks(), and publish a
/// new context accordingly.
pid: Pid,
}

impl ProcessContextHandle {
Expand Down Expand Up @@ -264,7 +267,11 @@ pub mod linux {

let _ = mapping.set_name();

Ok(ProcessContextHandle { mapping, payload })
Ok(ProcessContextHandle {
mapping,
payload,
pid: getpid(),
})
}

/// Updates the context after initial publication.
Expand Down Expand Up @@ -341,15 +348,45 @@ pub mod linux {

/// Publishes or updates the process context for it to be visible by external readers.
///
/// If this is the first publication, or if [unpublish] has been called last, this will follow
/// the Publish protocol of the process context specification.
/// If any of the following condition holds:
///
/// - this is the first publication
/// - [unpublish] has been called last
/// - the previous context has been published from a different process id (that is, a `fork()`
/// happened and we're the child process)
///
/// Then we follow the Publish protocol of the OTel process context specification (allocating a
/// fresh mapping).
///
/// Otherwise, if a context has been previously published from the same process and hasn't been
/// unpublished since, we follow the Update protocol.
///
/// # Fork safety
///
/// Otherwise, the context is updated following the Update protocol.
/// If we're a forked children of the original publisher, we are extremely restricted in the
/// set of operations that we can do (we must be async-signal-safe). On paper, heap allocation
/// is Undefined Behavior, for example. We assume that a forking runtime (such as Python or
/// Ruby) that doesn't follow with an immediate `exec` is already "taking that risk", so to
/// speak (typically, if no thread is ever spawned before the fork, things are mostly fine).
Comment thread
yannham marked this conversation as resolved.
pub fn publish(payload: Vec<u8>) -> anyhow::Result<()> {
let mut guard = lock_context_handle()?;

match &mut *guard {
Some(handler) => handler.update(payload),
Some(handler) if handler.pid == getpid() => handler.update(payload),
Some(handler) => {
let mut local_handler = ProcessContextHandle::publish(payload)?;
// If we've been forked, we need to prevent the mapping from being dropped
// normally, as it would try to unmap a region that isn't mapped anymore in the
// child process, or worse, could have been remapped to something else in the
// meantime.
//
// To do so, we get the old handler back in `local_handler` and prevent `mapping`
// from being dropped specifically.
std::mem::swap(&mut local_handler, handler);
let _: ManuallyDrop<MemMapping> = ManuallyDrop::new(local_handler.mapping);

Ok(())
}
None => {
*guard = Some(ProcessContextHandle::publish(payload)?);
Ok(())
Expand Down
Loading