Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions entrace_core/src/mmap/et_storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
any::Any,
collections::BTreeMap,
io::{BufReader, BufWriter, Read, Seek, Write},
sync::RwLock,
thread::JoinHandle,
Expand All @@ -15,7 +16,7 @@ use crate::{
};

pub enum Message<Q: FileLike + Send> {
Entry(MixedTraceEntry),
Entry { id: u32, entry: MixedTraceEntry },
Shutdown(Q),
}
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -65,18 +66,32 @@ impl<T: FileLike + Send + 'static, Q: FileLike + Send + 'static> ETStorage<T, Q>
bincode::serde::encode_into_std_write(TraceEntry::root(), &mut writer, config)
.unwrap();
cur_offset += len as u64;

// If multiple threads are writing subspans to the same parent, there can be a race
// condition where a child arrives before the parent. This is reproducable on Windows
// only, see issue #4.
// To prevent this, we store disjoint leaves in a buffer until their parent arrives.
// next_id keeps track of what id comes next in sequential order. if the buffer has next_id, we are good and the entry can be written.
// Else we wait for it to appear.
let mut buffer = BTreeMap::new();
let mut next_id = 1;

while let Ok(msg) = rx.recv() {
match msg {
Message::Entry(entry) => {
offsets.push(cur_offset);
let len = child_lists.len() as u32;
child_lists[entry.parent as usize].children.push(len);
child_lists.push(PoolEntry::new());
let cfg = config;
let written =
bincode::serde::encode_into_std_write(&entry, &mut writer, cfg)
.unwrap();
cur_offset += written as u64;
Message::Entry { id, entry } => {
buffer.insert(id, entry);
while let Some(entry) = buffer.remove(&next_id) {
offsets.push(cur_offset);
let len = child_lists.len() as u32;
child_lists[entry.parent as usize].children.push(len);
child_lists.push(PoolEntry::new());
let cfg = config;
let written =
bincode::serde::encode_into_std_write(&entry, &mut writer, cfg)
.unwrap();
cur_offset += written as u64;
next_id += 1;
}
}
Message::Shutdown(mut tmp_buf) => {
let mut tmp_buf_writer = BufWriter::new(&mut tmp_buf);
Expand Down Expand Up @@ -118,13 +133,15 @@ impl<T: FileLike + Send + 'static, Q: FileLike + Send + 'static> ETStorage<T, Q>
}
}
impl<T: FileLike + Send + 'static, Q: FileLike + Send + 'static> Storage for ETStorage<T, Q> {
fn new_span(&self, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>) {
fn new_span(
&self, id: u32, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>,
) {
let message = attrs.iter().find(|x| x.0 == "message").map(|x| match &x.1 {
EnValue::String(y) => y.clone(),
q => format!("{q:?}"),
});
let entry = MixedTraceEntry { parent, metadata: meta.into(), attributes: attrs, message };

self.sender.send(Message::Entry(entry)).ok();
self.sender.send(Message::Entry { id, entry }).ok();
}
}
34 changes: 23 additions & 11 deletions entrace_core/src/remote/remote_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{StorageFormat, TraceEntry, entrace_magic_for, storage::Storage, tree_layer::EnValue};
use crossbeam_channel::{SendError, Sender};
use std::{any::Any, io::Write, sync::RwLock, thread::JoinHandle};
use std::{any::Any, collections::BTreeMap, io::Write, sync::RwLock, thread::JoinHandle};

pub enum RemoteMessage {
NewSpan(TraceEntry),
NewSpan { id: u32, entry: TraceEntry },
Shutdown,
}
pub struct IETStorageConfig<T: Write + Send> {
Expand Down Expand Up @@ -65,10 +65,22 @@ impl<T: Write + Send + 'static> IETStorage<T> {
}

write_message(&mut buffer, TraceEntry::root(), &mut config);
// If multiple threads are writing subspans to the same parent, there can be a race
// condition where a child arrives before the parent. This is reproducable on Windows
// only, see issue #4.
// To prevent this, we store disjoint leaves in a buffer until their parent arrives.
// next_id keeps track of what id comes next in sequential order. if the buffer has next_id, we are good and the entry can be written.
// Else we wait for it to appear.
let mut reorder_buffer = BTreeMap::new();
let mut next_id = 1u32;
while let Ok(msg) = rx.recv() {
match msg {
RemoteMessage::NewSpan(m) => {
write_message(&mut buffer, m, &mut config);
RemoteMessage::NewSpan { id, entry } => {
reorder_buffer.insert(id, entry);
while let Some(entry) = reorder_buffer.remove(&next_id) {
write_message(&mut buffer, entry, &mut config);
next_id += 1;
}
}
RemoteMessage::Shutdown => break,
}
Expand All @@ -88,18 +100,18 @@ impl<T: Write + Send + 'static> IETStorage<T> {
}
}
impl<T: Write + Send + 'static> Storage for IETStorage<T> {
fn new_span(&self, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>) {
fn new_span(
&self, id: u32, parent: u32, attrs: crate::Attrs, meta: &'static tracing::Metadata<'_>,
) {
let message = attrs.iter().find(|x| x.0 == "message").map(|x| match &x.1 {
EnValue::String(y) => y.clone(),
q => format!("{q:?}"),
});
self.sender
.send(RemoteMessage::NewSpan(TraceEntry {
parent,
message,
metadata: meta.into(),
attributes: attrs,
}))
.send(RemoteMessage::NewSpan {
id,
entry: TraceEntry { parent, message, metadata: meta.into(), attributes: attrs },
})
.ok();
}
}
Expand Down
6 changes: 3 additions & 3 deletions entrace_core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ impl Close for Vec<JoinHandle<()>> {

/// Used in the entrace backend to store data received by a [crate::tree_layer::TreeLayer]
pub trait Storage {
fn new_span(&self, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>);
fn new_span(&self, id: u32, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>);
/// Implemented by default as a call to [Storage::new_span].
fn new_event(&self, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>) {
self.new_span(parent, attrs, meta);
fn new_event(&self, id: u32, parent: u32, attrs: Attrs, meta: &'static Metadata<'_>) {
self.new_span(id, parent, attrs, meta);
}
}
5 changes: 3 additions & 2 deletions entrace_core/src/tree_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<S: Subscriber, S2: Storage + 'static> Layer<S> for TreeLayer<S2> {
let pool_id: u32 = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1; // the atomic returns the previous value, so add one here too
self.id_to_pool.write().unwrap().insert(id.clone(), pool_id);
let sent_attrs = visitor.attrs.into_iter().map(|x| (x.0.to_string(), x.1)).collect();
self.storage.new_span(parent, sent_attrs, attrs.metadata());
self.storage.new_span(pool_id, parent, sent_attrs, attrs.metadata());
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
let parent: u32;
Expand All @@ -75,8 +75,9 @@ impl<S: Subscriber, S2: Storage + 'static> Layer<S> for TreeLayer<S2> {

let mut visitor = EventVisitor::new();
event.record(&mut visitor);
self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let pool_id = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
self.storage.new_event(
pool_id,
parent,
visitor.attrs.into_iter().map(|x| (x.0.to_string(), x.1)).collect(),
event.metadata(),
Expand Down
50 changes: 50 additions & 0 deletions entrace_core/tests/issue_4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use entrace_core::{TreeLayer, mmap::ETStorage};
use std::io::Cursor;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::thread;
use tracing::info_span;
use tracing_subscriber::prelude::*;

// Test to reproduce issue 4. This is only interesting on Windows.
#[test]
fn test_issue_4() {
type VecCursor = Cursor<Vec<u8>>;
let storage = Arc::new(ETStorage::<VecCursor, VecCursor>::init(Cursor::new(vec![])));
tracing_subscriber::registry().with(TreeLayer::from_storage(storage.clone())).init();
let running = Arc::new(AtomicBool::new(true));
let mut handles = vec![];

for _ in 0..8 {
let running = running.clone();
handles.push(thread::spawn(move || {
while running.load(Ordering::Relaxed) {
let parent = Arc::new(info_span!("parent"));
let p1 = parent.clone();
let p2 = parent.clone();

let t1 = thread::spawn(move || {
// ideally this gets a lower id but sends *later*
let _c1 = info_span!(parent: &*p1, "C1");
});

let t2 = thread::spawn(move || {
// ideally this gets a higher id but sends *sooner*
let c2 = info_span!(parent: &*p2, "C2");
// a grandchild, because why not.
let _gc = info_span!(parent: &c2, "GC");
});

t1.join().unwrap();
t2.join().unwrap();
}
}));
}
std::thread::sleep(std::time::Duration::from_secs(5));
running.store(false, Ordering::Relaxed);
for h in handles {
h.join().unwrap();
}
}
Loading