diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs index 9022327..6c9eb47 100644 --- a/appenders/async/src/append.rs +++ b/appenders/async/src/append.rs @@ -56,11 +56,19 @@ impl Append for Async { } fn flush(&self) -> Result<(), Error> { - let overflow = self.overflow; + let (completion, done) = crossbeam_channel::bounded(0); let task = Task::Flush { appends: self.appends.clone(), + completion, }; - self.state.send_task(task, overflow) + self.state.send_task(task, Overflow::Block)?; + match done.recv() { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(_) => Err(Error::new( + "async appender worker exited before completing flush", + )), + } } fn exit(&self) -> Result<(), Error> { @@ -174,3 +182,92 @@ impl<'a> Visitor for DiagnosticCollector<'a> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use logforth_core::Trap; + use logforth_core::record::Record; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Barrier}; + + #[derive(Debug)] + struct BarrierAppend { + started: Arc, + barrier: Arc, + } + + impl Append for BarrierAppend { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + self.started.store(true, Ordering::SeqCst); + self.barrier.wait(); + Ok(()) + } + } + + #[derive(Debug)] + struct FailingFlush; + + impl Append for FailingFlush { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + Err(Error::new("flush failed")) + } + } + + #[derive(Debug)] + struct NoopTrap; + + impl Trap for NoopTrap { + fn trap(&self, _: &Error) {} + } + + #[test] + fn flush_waits_for_worker_completion() { + let started = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(2)); + + let append = BarrierAppend { + started: started.clone(), + barrier: barrier.clone(), + }; + + let async_append = AsyncBuilder::new("async-flush-wait").append(append).build(); + + let barrier_for_main = barrier.clone(); + let flush_handle = std::thread::spawn(move || async_append.flush()); + + while !started.load(Ordering::SeqCst) { + std::thread::yield_now(); + } + + assert!(!flush_handle.is_finished()); + + barrier_for_main.wait(); + + flush_handle + .join() + .expect("flush thread panicked") + .expect("flush should succeed"); + } + + #[test] + fn flush_propagates_errors() { + let async_append = AsyncBuilder::new("async-flush-error") + .trap(NoopTrap) + .append(FailingFlush) + .build(); + + let err = async_append.flush().expect_err("flush should fail"); + let err_text = err.to_string(); + assert!(err_text.contains("failed to flush")); + assert!(err_text.contains("flush failed")); + } +} diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index bbe46f8..bf039eb 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use logforth_core::Append; +use logforth_core::Error; use logforth_core::kv; use logforth_core::record::RecordOwned; @@ -37,6 +38,7 @@ enum Task { }, Flush { appends: Arc<[Box]>, + completion: crossbeam_channel::Sender>, }, } diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs index 944fccb..c7d4fde 100644 --- a/appenders/async/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -53,13 +53,25 @@ impl Worker { } } } - Task::Flush { appends } => { + Task::Flush { + appends, + completion, + } => { + let mut first_error = None; for append in appends.iter() { if let Err(err) = append.flush() { let err = Error::new("failed to flush").set_source(err); trap.trap(&err); + if first_error.is_none() { + first_error = Some(err); + } } } + if let Some(err) = first_error { + let _ = completion.send(Err(err)); + } else { + let _ = completion.send(Ok(())); + } } } }