From f97faeb399ab5ea26ffd0f4c9d530f921745e3bb Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 28 Aug 2021 10:44:17 +0900 Subject: [PATCH] Fixes a race condition in the scheduler. Closes #474 --- quickwit-actors/src/scheduler.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/quickwit-actors/src/scheduler.rs b/quickwit-actors/src/scheduler.rs index 71e3d423630..134ab4f8f18 100644 --- a/quickwit-actors/src/scheduler.rs +++ b/quickwit-actors/src/scheduler.rs @@ -253,21 +253,28 @@ impl Scheduler { fn schedule_next_timeout(&mut self, ctx: &ActorContext) { let simulated_now = self.simulated_now(); let next_deadline_opt = self.future_events.peek().map(|evt| evt.0.deadline); - let timeout = if let Some(next_deadline) = next_deadline_opt { - next_deadline - simulated_now - } else { - return; - }; - let mailbox = ctx.mailbox().clone(); - let timeout = async move { - tokio::time::sleep(timeout).await; - let _ = mailbox.send_message(SchedulerMessage::Timeout).await; + let timeout = match next_deadline_opt { + Some(next_deadline) if next_deadline <= simulated_now => Duration::default(), + Some(next_deadline) => next_deadline - simulated_now, + None => { + // No event to schedule + return; + } }; - let new_join_handle: JoinHandle<()> = tokio::task::spawn(timeout); + // The next event timeout is about to change. Let's cancel the previous scheduled event. if let Some(previous_join_handle) = self.next_timeout.take() { previous_join_handle.abort(); } - // n.b.: Dropping the previous timeout cancels it. + let self_mailbox = ctx.mailbox().clone(); + let new_join_handle: JoinHandle<()> = tokio::task::spawn(async move { + if timeout.is_zero() { + tokio::task::yield_now().await; + } else { + tokio::time::sleep(timeout).await; + } + // We ignore the send error here. The scheduler was just terminated + let _ = self_mailbox.send_message(SchedulerMessage::Timeout).await; + }); self.next_timeout = Some(new_join_handle); } }