Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 99 additions & 2 deletions appenders/async/src/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@ impl Append for Async {
}

fn flush(&self) -> Result<(), Error> {
let overflow = self.overflow;
let (completion, done) = crossbeam_channel::bounded(0);
let task = Task::Flush {
appends: self.appends.clone(),
completion,
};
self.state.send_task(task, overflow)
self.state.send_task(task, Overflow::Block)?;
match done.recv() {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(_) => Err(Error::new(
"async appender worker exited before completing flush",
)),
}
Comment on lines +65 to +71
Copy link

@SpriteOvO SpriteOvO Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I missed some context, but isn't the point of async appender not to block the current thread? If we wait for the flushing result here wouldn't it be no different than sync appender?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, flush always represents persistence. async only indicates that operations like write are asynchronous.

Since this is the expected behavior, I will close this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I get your point, the meaning of the term "async" is varied a little bit in different contexts -- performs in a different thread, or not to block the current thread, or more meanings such as co-routines related, etc.

IMHO, the "async" in logging is more in favor of not blocking the current thread thus minimizing the performance impact and making sure that the logs are delivered eventually as much as possible.

Appreciate your contribution attempt anyway! :)

}

fn exit(&self) -> Result<(), Error> {
Expand Down Expand Up @@ -174,3 +182,92 @@ impl<'a> Visitor for DiagnosticCollector<'a> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use logforth_core::Trap;
use logforth_core::record::Record;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};

#[derive(Debug)]
struct BarrierAppend {
started: Arc<AtomicBool>,
barrier: Arc<Barrier>,
}

impl Append for BarrierAppend {
fn append(&self, _: &Record, _: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
Ok(())
}

fn flush(&self) -> Result<(), Error> {
self.started.store(true, Ordering::SeqCst);
self.barrier.wait();
Ok(())
}
}

#[derive(Debug)]
struct FailingFlush;

impl Append for FailingFlush {
fn append(&self, _: &Record, _: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
Ok(())
}

fn flush(&self) -> Result<(), Error> {
Err(Error::new("flush failed"))
}
}

#[derive(Debug)]
struct NoopTrap;

impl Trap for NoopTrap {
fn trap(&self, _: &Error) {}
}

#[test]
fn flush_waits_for_worker_completion() {
let started = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(2));

let append = BarrierAppend {
started: started.clone(),
barrier: barrier.clone(),
};

let async_append = AsyncBuilder::new("async-flush-wait").append(append).build();

let barrier_for_main = barrier.clone();
let flush_handle = std::thread::spawn(move || async_append.flush());

while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}

assert!(!flush_handle.is_finished());

barrier_for_main.wait();

flush_handle
.join()
.expect("flush thread panicked")
.expect("flush should succeed");
}

#[test]
fn flush_propagates_errors() {
let async_append = AsyncBuilder::new("async-flush-error")
.trap(NoopTrap)
.append(FailingFlush)
.build();

let err = async_append.flush().expect_err("flush should fail");
let err_text = err.to_string();
assert!(err_text.contains("failed to flush"));
assert!(err_text.contains("flush failed"));
}
}
2 changes: 2 additions & 0 deletions appenders/async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::sync::Arc;

use logforth_core::Append;
use logforth_core::Error;
use logforth_core::kv;
use logforth_core::record::RecordOwned;

Expand All @@ -37,6 +38,7 @@ enum Task {
},
Flush {
appends: Arc<[Box<dyn Append>]>,
completion: crossbeam_channel::Sender<Result<(), Error>>,
},
}

Expand Down
14 changes: 13 additions & 1 deletion appenders/async/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,25 @@ impl Worker {
}
}
}
Task::Flush { appends } => {
Task::Flush {
appends,
completion,
} => {
let mut first_error = None;
for append in appends.iter() {
if let Err(err) = append.flush() {
let err = Error::new("failed to flush").set_source(err);
trap.trap(&err);
if first_error.is_none() {
first_error = Some(err);
}
}
}
if let Some(err) = first_error {
let _ = completion.send(Err(err));
} else {
let _ = completion.send(Ok(()));
}
}
}
}
Expand Down
Loading