From 85ee4cb6fdfc9c7f5e7390ed154523b2fce2afc7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 3 Nov 2025 12:30:45 -0800 Subject: [PATCH 1/2] Ensure I/O cancels correctly when scan is dropped --- rust/lance-io/src/scheduler.rs | 47 +++++----------------- rust/lance/src/dataset/scanner.rs | 53 +++++++++++++++++++++++++ rust/lance/src/io/exec/filtered_read.rs | 29 +++++++++++++- rust/lance/src/utils/test.rs | 11 +++++ 4 files changed, 100 insertions(+), 40 deletions(-) diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index e3aaae7aa30..4854f801121 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; use tokio::sync::{Notify, Semaphore, SemaphorePermit}; +use tokio::task::JoinHandle; use lance_core::{Error, Result}; @@ -195,9 +196,6 @@ struct IoQueueState { pending_requests: BinaryHeap, // Priorities of in-flight requests priorities_in_flight: PrioritiesInFlight, - // Set when the scheduler is finished to notify the I/O loop to shut down - // once all outstanding requests have been completed. - done_scheduling: bool, // Time when the scheduler started start: Instant, // Last time we warned about backpressure @@ -211,16 +209,11 @@ impl IoQueueState { bytes_avail: io_buffer_size as i64, pending_requests: BinaryHeap::new(), priorities_in_flight: PrioritiesInFlight::new(io_capacity), - done_scheduling: false, start: Instant::now(), last_warn: AtomicU64::from(0), } } - fn finished(&self) -> bool { - self.done_scheduling && self.pending_requests.is_empty() - } - fn warn_if_needed(&self) { let seconds_elapsed = self.start.elapsed().as_secs(); let last_warn = self.last_warn.load(Ordering::Acquire); @@ -303,7 +296,7 @@ impl IoQueue { self.notify.notify_one(); } - async fn pop(&self) -> Option { + async fn pop(&self) -> IoTask { loop { { // First, grab a reservation on the global IOPS quota @@ -317,11 +310,7 @@ impl IoQueue { // Reservation successfully acquired, we will release the global // global reservation after task has run. iop_res.forget(); - return Some(task); - } - - if state.finished() { - return None; + return task; } } @@ -347,14 +336,6 @@ impl IoQueue { self.notify.notify_one(); } - - fn close(&self) { - let mut state = self.state.lock().unwrap(); - state.done_scheduling = true; - drop(state); - - self.notify.notify_one(); - } } // There is one instance of MutableBatch shared by all the I/O operations @@ -506,19 +487,9 @@ impl IoTask { // Every time a scheduler starts up it launches a task to run the I/O loop. This loop // repeats endlessly until the scheduler is destroyed. async fn run_io_loop(tasks: Arc) { - // Pop the first finished task off the queue and submit another until - // we are done loop { let next_task = tasks.pop().await; - match next_task { - Some(task) => { - tokio::spawn(task.run()); - } - None => { - // The sender has been dropped, we are done - return; - } - } + tokio::spawn(next_task.run()); } } @@ -584,6 +555,7 @@ pub struct ScanScheduler { object_store: Arc, io_queue: Arc, stats: Arc, + io_loop_handle: JoinHandle<()>, } impl Debug for ScanScheduler { @@ -639,13 +611,12 @@ impl ScanScheduler { io_capacity as u32, config.io_buffer_size_bytes, )); - let scheduler = Self { + Arc::new(Self { object_store, io_queue: io_queue.clone(), stats: Arc::new(StatsCollector::new()), - }; - tokio::task::spawn(async move { run_io_loop(io_queue).await }); - Arc::new(scheduler) + io_loop_handle: tokio::task::spawn(async move { run_io_loop(io_queue).await }), + }) } /// Open a file for reading @@ -769,7 +740,7 @@ impl ScanScheduler { impl Drop for ScanScheduler { fn drop(&mut self) { - self.io_queue.close(); + self.io_loop_handle.abort(); } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 3e53c83545c..a603de167ea 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2056,6 +2056,10 @@ impl Scanner { read_options = read_options.with_deleted_rows()?; } + if let Some(io_buffer_size_bytes) = self.io_buffer_size { + read_options = read_options.with_io_buffer_size(io_buffer_size_bytes); + } + let index_input = filter_plan.index_query.clone().map(|index_query| { Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query)) as Arc @@ -8376,4 +8380,53 @@ mod test { ); } } + + #[test_log::test(test)] + fn test_scan_finishes_all_tasks() { + // Need to use multi-threaded runtime otherwise tasks don't run unless someone is polling somewhere + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async move { + let ds = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .into_ram_dataset(FragmentCount::from(1000), FragmentRowCount::from(10)) + .await + .unwrap(); + + // This scan with has a small I/O buffer size and batch size to mimic a real-world situation + // that required a lot of data. Many fragments will be scheduled at low priority and the data + // buffer will fill up with data reads. When the scan is abandoned, the tasks to read the fragment + // metadata were left behind and would never finish because the data was never decoded to drain the + // backpressure queue. + // + // The fix (that this test verifies) is to ensure we close the I/O scheduler when the scan is abandoned. + let mut stream = ds + .scan() + .fragment_readahead(1000) + .batch_size(1) + .io_buffer_size(1) + .batch_readahead(1) + .try_into_stream() + .await + .unwrap(); + stream.next().await.unwrap().unwrap(); + }); + + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(10) { + if runtime.handle().metrics().num_alive_tasks() == 0 { + break; + } + std::thread::sleep(Duration::from_millis(100)); + } + + assert!( + runtime.handle().metrics().num_alive_tasks() == 0, + "Tasks should have finished within 10 seconds but there are still {} tasks running", + runtime.handle().metrics().num_alive_tasks() + ); + } } diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 52763dd6718..fc18e33d1df 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -11,6 +11,7 @@ use arrow::array::AsArray; use arrow::datatypes::UInt32Type; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; +use datafusion::common::runtime::SpawnedTask; use datafusion::common::stats::Precision; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -365,6 +366,13 @@ impl FilteredReadStream { .clone() .unwrap_or_else(|| dataset.fragments().clone()); + log::debug!( + "Filtered read on {} fragments with frag_readahead={} and io_parallelism={}", + fragments.len(), + fragment_readahead, + io_parallelism + ); + // Ideally we don't need to collect here but if we don't we get "implementation of FnOnce is // not general enough" false positives from rustc let frag_futs = fragments @@ -386,7 +394,13 @@ impl FilteredReadStream { let output_schema = Arc::new(options.projection.to_arrow_schema()); let obj_store = dataset.object_store.clone(); - let scheduler_config = SchedulerConfig::max_bandwidth(obj_store.as_ref()); + let scheduler_config = if let Some(io_buffer_size_bytes) = options.io_buffer_size_bytes { + SchedulerConfig { + io_buffer_size_bytes, + } + } else { + SchedulerConfig::max_bandwidth(obj_store.as_ref()) + }; let scan_scheduler = ScanScheduler::new(obj_store, scheduler_config); let (scoped_fragments, scan_planned_with_limit_pushed_down) = Self::plan_scan( @@ -412,7 +426,7 @@ impl FilteredReadStream { move |scoped_fragment| { let metrics = global_metrics_clone.clone(); let limit = scan_range_after_filter.as_ref().map(|r| r.end); - tokio::task::spawn( + SpawnedTask::spawn( Self::read_fragment(scoped_fragment, metrics, limit).in_current_span(), ) .map(|thread_result| thread_result.unwrap()) @@ -1197,6 +1211,8 @@ pub struct FilteredReadOptions { pub full_filter: Option, /// The threading mode to use for the scan pub threading_mode: FilteredReadThreadingMode, + /// The size of the I/O buffer to use for the scan + pub io_buffer_size_bytes: Option, } impl FilteredReadOptions { @@ -1223,6 +1239,7 @@ impl FilteredReadOptions { projection, refine_filter: None, full_filter: None, + io_buffer_size_bytes: None, threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads( get_num_compute_intensive_cpus(), ), @@ -1365,6 +1382,14 @@ impl FilteredReadOptions { self.projection = projection; self } + + /// Specify the size of the I/O buffer (in bytes) to use for the scan + /// + /// See [`crate::dataset::scanner::Scanner::io_buffer_size`] for more details. + pub fn with_io_buffer_size(mut self, io_buffer_size: u64) -> Self { + self.io_buffer_size_bytes = Some(io_buffer_size); + self + } } /// A plan node that reads a dataset, applying an optional filter and projection. diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6af1158fe26..ea9c86d58f8 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -350,6 +350,17 @@ impl DatagenExt for BatchGeneratorBuilder { rows_per_fragment: FragmentRowCount, write_params: Option, ) -> lance_core::Result { + // Need to verify that max_rows_per_file has been set otherwise the frag_count won't be respected + if let Some(write_params) = &write_params { + if write_params.max_rows_per_file != rows_per_fragment.0 as usize { + panic!( + "Max rows per file in write params does not match rows per fragment: {} != {}", + write_params.max_rows_per_file, rows_per_fragment.0 as usize + ); + } + } else { + panic!("Write params are not set, will not write correct # of fragments"); + } let reader = self.into_reader_rows( RowCount::from(rows_per_fragment.0 as u64), BatchCount::from(frag_count.0), From 779cb2b64a29d0bf154fca70c37ddc16ab367558 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 4 Nov 2025 14:37:24 -0800 Subject: [PATCH 2/2] Ensure scan scheduler stays alive even if scheduling finishes before decoding --- rust/lance-encoding/src/decoder.rs | 7 ++- rust/lance-io/src/scheduler.rs | 75 +++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index af87e068f9f..50468277bda 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -226,6 +226,7 @@ use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; use lance_core::cache::LanceCache; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; +use lance_core::utils::futures::FinallyStreamExt; use log::{debug, trace, warn}; use snafu::location; use tokio::sync::mpsc::error::SendError; @@ -1977,6 +1978,8 @@ pub fn schedule_and_decode( // trying to read them has caused bugs in the past. let requested_rows = requested_rows.trim_empty_ranges(); + let io = config.io.clone(); + // For convenience we really want this method to be a snchronous method where all // errors happen on the stream. There is some async initialization that must happen // when creating a scheduler. We wrap that all up in the very first task. @@ -1988,8 +1991,10 @@ pub fn schedule_and_decode( target_schema, config, ) { + // Keep the io alive until the stream is dropped or finishes. Otherwise the + // I/O drops as soon as the scheduling is finished and the I/O loop terminates. + Ok(stream) => stream.finally(move || drop(io)).boxed(), // If the initialization failed make it look like a failed task - Ok(stream) => stream, Err(e) => stream::once(std::future::ready(ReadBatchTask { num_rows: 0, task: std::future::ready(Err(e)).boxed(), diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index 4854f801121..a3591940cef 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; use tokio::sync::{Notify, Semaphore, SemaphorePermit}; -use tokio::task::JoinHandle; use lance_core::{Error, Result}; @@ -179,8 +178,6 @@ impl PrioritiesInFlight { fn remove(&mut self, prio: u128) { if let Ok(pos) = self.in_flight.binary_search(&prio) { self.in_flight.remove(pos); - } else { - unreachable!(); } } } @@ -196,6 +193,9 @@ struct IoQueueState { pending_requests: BinaryHeap, // Priorities of in-flight requests priorities_in_flight: PrioritiesInFlight, + // Set when the scheduler is finished to notify the I/O loop to shut down + // once all outstanding requests have been completed. + done_scheduling: bool, // Time when the scheduler started start: Instant, // Last time we warned about backpressure @@ -209,6 +209,7 @@ impl IoQueueState { bytes_avail: io_buffer_size as i64, pending_requests: BinaryHeap::new(), priorities_in_flight: PrioritiesInFlight::new(io_capacity), + done_scheduling: false, start: Instant::now(), last_warn: AtomicU64::from(0), } @@ -296,7 +297,7 @@ impl IoQueue { self.notify.notify_one(); } - async fn pop(&self) -> IoTask { + async fn pop(&self) -> Option { loop { { // First, grab a reservation on the global IOPS quota @@ -310,7 +311,11 @@ impl IoQueue { // Reservation successfully acquired, we will release the global // global reservation after task has run. iop_res.forget(); - return task; + return Some(task); + } + + if state.done_scheduling { + return None; } } @@ -336,6 +341,18 @@ impl IoQueue { self.notify.notify_one(); } + + fn close(&self) { + let mut state = self.state.lock().unwrap(); + state.done_scheduling = true; + let pending_requests = std::mem::take(&mut state.pending_requests); + drop(state); + for request in pending_requests { + request.cancel(); + } + + self.notify.notify_one(); + } } // There is one instance of MutableBatch shared by all the I/O operations @@ -451,6 +468,12 @@ impl IoTask { fn num_bytes(&self) -> u64 { self.to_read.end - self.to_read.start } + fn cancel(self) { + (self.when_done)(Err(Error::Internal { + message: "Scheduler closed before I/O was completed".to_string(), + location: location!(), + })); + } async fn run(self) { let file_path = self.reader.path().as_ref(); @@ -487,9 +510,19 @@ impl IoTask { // Every time a scheduler starts up it launches a task to run the I/O loop. This loop // repeats endlessly until the scheduler is destroyed. async fn run_io_loop(tasks: Arc) { + // Pop the first finished task off the queue and submit another until + // we are done loop { let next_task = tasks.pop().await; - tokio::spawn(next_task.run()); + match next_task { + Some(task) => { + tokio::spawn(task.run()); + } + None => { + // The sender has been dropped, we are done + return; + } + } } } @@ -550,12 +583,15 @@ impl ScanStats { /// An I/O scheduler which wraps an ObjectStore and throttles the amount of /// parallel I/O that can be run. /// -/// TODO: This will also add coalescing +/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped. +/// For this reason it should be kept alive until all I/O has finished. +/// +/// Note: The 2.X file readers already do this so this is only a concern if you are +/// using the ScanScheduler directly. pub struct ScanScheduler { object_store: Arc, io_queue: Arc, stats: Arc, - io_loop_handle: JoinHandle<()>, } impl Debug for ScanScheduler { @@ -611,12 +647,16 @@ impl ScanScheduler { io_capacity as u32, config.io_buffer_size_bytes, )); - Arc::new(Self { + let slf = Arc::new(Self { object_store, io_queue: io_queue.clone(), stats: Arc::new(StatsCollector::new()), - io_loop_handle: tokio::task::spawn(async move { run_io_loop(io_queue).await }), - }) + }); + // Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is + // dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task + // to wait for it to finish but that doesn't seem helpful. + tokio::task::spawn(async move { run_io_loop(io_queue).await }); + slf } /// Open a file for reading @@ -740,7 +780,18 @@ impl ScanScheduler { impl Drop for ScanScheduler { fn drop(&mut self) { - self.io_loop_handle.abort(); + // If the user is dropping the ScanScheduler then they _should_ be done with I/O. This can happen + // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found + // the data they wanted (limit after filter or some other example). + // + // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop. However, + // it will not terminate the I/O loop itself. This is to help prevent deadlock and ensure that all I/O + // requests that are submitted will terminate. + // + // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they + // drop the scheduler. In practice, this can be difficult to do, and it is better to spend a little bit + // of time letting the I/O loop drain so that we can avoid any potential deadlocks. + self.io_queue.close(); } }