From 4500927f2173829450badfcc1a52b3cf3cabc4ac Mon Sep 17 00:00:00 2001 From: algorithmiker <104317939+algorithmiker@users.noreply.github.com> Date: Sat, 10 Jan 2026 12:57:22 +0100 Subject: [PATCH] Fix issue 4. Fixes a race condition on windows where a descendant of a span could arrive before its parent. It *still* can arrive before its parent, however this case is now handled correctly. --- entrace_core/src/mmap/et_storage.rs | 43 +++++++++++++------ entrace_core/src/remote/remote_storage.rs | 34 ++++++++++----- entrace_core/src/storage.rs | 6 +-- entrace_core/src/tree_layer.rs | 5 ++- entrace_core/tests/issue_4.rs | 50 +++++++++++++++++++++++ 5 files changed, 109 insertions(+), 29 deletions(-) create mode 100644 entrace_core/tests/issue_4.rs diff --git a/entrace_core/src/mmap/et_storage.rs b/entrace_core/src/mmap/et_storage.rs index 5eea9b2..50305de 100644 --- a/entrace_core/src/mmap/et_storage.rs +++ b/entrace_core/src/mmap/et_storage.rs @@ -1,5 +1,6 @@ use std::{ any::Any, + collections::BTreeMap, io::{BufReader, BufWriter, Read, Seek, Write}, sync::RwLock, thread::JoinHandle, @@ -15,7 +16,7 @@ use crate::{ }; pub enum Message { - Entry(MixedTraceEntry), + Entry { id: u32, entry: MixedTraceEntry }, Shutdown(Q), } #[derive(thiserror::Error, Debug)] @@ -65,18 +66,32 @@ impl ETStorage bincode::serde::encode_into_std_write(TraceEntry::root(), &mut writer, config) .unwrap(); cur_offset += len as u64; + + // If multiple threads are writing subspans to the same parent, there can be a race + // condition where a child arrives before the parent. This is reproducable on Windows + // only, see issue #4. + // To prevent this, we store disjoint leaves in a buffer until their parent arrives. + // next_id keeps track of what id comes next in sequential order. if the buffer has next_id, we are good and the entry can be written. + // Else we wait for it to appear. + let mut buffer = BTreeMap::new(); + let mut next_id = 1; + while let Ok(msg) = rx.recv() { match msg { - Message::Entry(entry) => { - offsets.push(cur_offset); - let len = child_lists.len() as u32; - child_lists[entry.parent as usize].children.push(len); - child_lists.push(PoolEntry::new()); - let cfg = config; - let written = - bincode::serde::encode_into_std_write(&entry, &mut writer, cfg) - .unwrap(); - cur_offset += written as u64; + Message::Entry { id, entry } => { + buffer.insert(id, entry); + while let Some(entry) = buffer.remove(&next_id) { + offsets.push(cur_offset); + let len = child_lists.len() as u32; + child_lists[entry.parent as usize].children.push(len); + child_lists.push(PoolEntry::new()); + let cfg = config; + let written = + bincode::serde::encode_into_std_write(&entry, &mut writer, cfg) + .unwrap(); + cur_offset += written as u64; + next_id += 1; + } } Message::Shutdown(mut tmp_buf) => { let mut tmp_buf_writer = BufWriter::new(&mut tmp_buf); @@ -118,13 +133,15 @@ impl ETStorage } } impl Storage for ETStorage { - fn new_span(&self, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>) { + fn new_span( + &self, id: u32, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>, + ) { let message = attrs.iter().find(|x| x.0 == "message").map(|x| match &x.1 { EnValue::String(y) => y.clone(), q => format!("{q:?}"), }); let entry = MixedTraceEntry { parent, metadata: meta.into(), attributes: attrs, message }; - self.sender.send(Message::Entry(entry)).ok(); + self.sender.send(Message::Entry { id, entry }).ok(); } } diff --git a/entrace_core/src/remote/remote_storage.rs b/entrace_core/src/remote/remote_storage.rs index 037fdf7..2ea0315 100644 --- a/entrace_core/src/remote/remote_storage.rs +++ b/entrace_core/src/remote/remote_storage.rs @@ -1,9 +1,9 @@ use crate::{StorageFormat, TraceEntry, entrace_magic_for, storage::Storage, tree_layer::EnValue}; use crossbeam_channel::{SendError, Sender}; -use std::{any::Any, io::Write, sync::RwLock, thread::JoinHandle}; +use std::{any::Any, collections::BTreeMap, io::Write, sync::RwLock, thread::JoinHandle}; pub enum RemoteMessage { - NewSpan(TraceEntry), + NewSpan { id: u32, entry: TraceEntry }, Shutdown, } pub struct IETStorageConfig { @@ -65,10 +65,22 @@ impl IETStorage { } write_message(&mut buffer, TraceEntry::root(), &mut config); + // If multiple threads are writing subspans to the same parent, there can be a race + // condition where a child arrives before the parent. This is reproducable on Windows + // only, see issue #4. + // To prevent this, we store disjoint leaves in a buffer until their parent arrives. + // next_id keeps track of what id comes next in sequential order. if the buffer has next_id, we are good and the entry can be written. + // Else we wait for it to appear. + let mut reorder_buffer = BTreeMap::new(); + let mut next_id = 1u32; while let Ok(msg) = rx.recv() { match msg { - RemoteMessage::NewSpan(m) => { - write_message(&mut buffer, m, &mut config); + RemoteMessage::NewSpan { id, entry } => { + reorder_buffer.insert(id, entry); + while let Some(entry) = reorder_buffer.remove(&next_id) { + write_message(&mut buffer, entry, &mut config); + next_id += 1; + } } RemoteMessage::Shutdown => break, } @@ -88,18 +100,18 @@ impl IETStorage { } } impl Storage for IETStorage { - fn new_span(&self, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>) { + fn new_span( + &self, id: u32, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>, + ) { let message = attrs.iter().find(|x| x.0 == "message").map(|x| match &x.1 { EnValue::String(y) => y.clone(), q => format!("{q:?}"), }); self.sender - .send(RemoteMessage::NewSpan(TraceEntry { - parent, - message, - metadata: meta.into(), - attributes: attrs, - })) + .send(RemoteMessage::NewSpan { + id, + entry: TraceEntry { parent, message, metadata: meta.into(), attributes: attrs }, + }) .ok(); } } diff --git a/entrace_core/src/storage.rs b/entrace_core/src/storage.rs index 0a8b082..f33dcc7 100644 --- a/entrace_core/src/storage.rs +++ b/entrace_core/src/storage.rs @@ -25,9 +25,9 @@ impl Close for Vec> { /// Used in the entrace backend to store data received by a [crate::tree_layer::TreeLayer] pub trait Storage { - fn new_span(&self, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>); + fn new_span(&self, id: u32, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>); /// Implemented by default as a call to [Storage::new_span]. - fn new_event(&self, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>) { - self.new_span(parent, attrs, meta); + fn new_event(&self, id: u32, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>) { + self.new_span(id, parent, attrs, meta); } } diff --git a/entrace_core/src/tree_layer.rs b/entrace_core/src/tree_layer.rs index a09845e..c5dd1b7 100644 --- a/entrace_core/src/tree_layer.rs +++ b/entrace_core/src/tree_layer.rs @@ -56,7 +56,7 @@ impl Layer for TreeLayer { let pool_id: u32 = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1; // the atomic returns the previous value, so add one here too self.id_to_pool.write().unwrap().insert(id.clone(), pool_id); let sent_attrs = visitor.attrs.into_iter().map(|x| (x.0.to_string(), x.1)).collect(); - self.storage.new_span(parent, sent_attrs, attrs.metadata()); + self.storage.new_span(pool_id, parent, sent_attrs, attrs.metadata()); } fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { let parent: u32; @@ -75,8 +75,9 @@ impl Layer for TreeLayer { let mut visitor = EventVisitor::new(); event.record(&mut visitor); - self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let pool_id = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1; self.storage.new_event( + pool_id, parent, visitor.attrs.into_iter().map(|x| (x.0.to_string(), x.1)).collect(), event.metadata(), diff --git a/entrace_core/tests/issue_4.rs b/entrace_core/tests/issue_4.rs new file mode 100644 index 0000000..ded7409 --- /dev/null +++ b/entrace_core/tests/issue_4.rs @@ -0,0 +1,50 @@ +use entrace_core::{TreeLayer, mmap::ETStorage}; +use std::io::Cursor; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use std::thread; +use tracing::info_span; +use tracing_subscriber::prelude::*; + +// Test to reproduce issue 4. This is only interesting on Windows. +#[test] +fn test_issue_4() { + type VecCursor = Cursor>; + let storage = Arc::new(ETStorage::::init(Cursor::new(vec![]))); + tracing_subscriber::registry().with(TreeLayer::from_storage(storage.clone())).init(); + let running = Arc::new(AtomicBool::new(true)); + let mut handles = vec![]; + + for _ in 0..8 { + let running = running.clone(); + handles.push(thread::spawn(move || { + while running.load(Ordering::Relaxed) { + let parent = Arc::new(info_span!("parent")); + let p1 = parent.clone(); + let p2 = parent.clone(); + + let t1 = thread::spawn(move || { + // ideally this gets a lower id but sends *later* + let _c1 = info_span!(parent: &*p1, "C1"); + }); + + let t2 = thread::spawn(move || { + // ideally this gets a higher id but sends *sooner* + let c2 = info_span!(parent: &*p2, "C2"); + // a grandchild, because why not. + let _gc = info_span!(parent: &c2, "GC"); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + } + })); + } + std::thread::sleep(std::time::Duration::from_secs(5)); + running.store(false, Ordering::Relaxed); + for h in handles { + h.join().unwrap(); + } +}