diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index af87e068f9f..50468277bda 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -226,6 +226,7 @@ use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; use lance_core::cache::LanceCache; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; +use lance_core::utils::futures::FinallyStreamExt; use log::{debug, trace, warn}; use snafu::location; use tokio::sync::mpsc::error::SendError; @@ -1977,6 +1978,8 @@ pub fn schedule_and_decode( // trying to read them has caused bugs in the past. let requested_rows = requested_rows.trim_empty_ranges(); + let io = config.io.clone(); + // For convenience we really want this method to be a snchronous method where all // errors happen on the stream. There is some async initialization that must happen // when creating a scheduler. We wrap that all up in the very first task. @@ -1988,8 +1991,10 @@ pub fn schedule_and_decode( target_schema, config, ) { + // Keep the io alive until the stream is dropped or finishes. Otherwise the + // I/O drops as soon as the scheduling is finished and the I/O loop terminates. + Ok(stream) => stream.finally(move || drop(io)).boxed(), // If the initialization failed make it look like a failed task - Ok(stream) => stream, Err(e) => stream::once(std::future::ready(ReadBatchTask { num_rows: 0, task: std::future::ready(Err(e)).boxed(), diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index e3aaae7aa30..a3591940cef 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -178,8 +178,6 @@ impl PrioritiesInFlight { fn remove(&mut self, prio: u128) { if let Ok(pos) = self.in_flight.binary_search(&prio) { self.in_flight.remove(pos); - } else { - unreachable!(); } } } @@ -217,10 +215,6 @@ impl IoQueueState { } } - fn finished(&self) -> bool { - self.done_scheduling && self.pending_requests.is_empty() - } - fn warn_if_needed(&self) { let seconds_elapsed = self.start.elapsed().as_secs(); let last_warn = self.last_warn.load(Ordering::Acquire); @@ -320,7 +314,7 @@ impl IoQueue { return Some(task); } - if state.finished() { + if state.done_scheduling { return None; } } @@ -351,7 +345,11 @@ impl IoQueue { fn close(&self) { let mut state = self.state.lock().unwrap(); state.done_scheduling = true; + let pending_requests = std::mem::take(&mut state.pending_requests); drop(state); + for request in pending_requests { + request.cancel(); + } self.notify.notify_one(); } @@ -470,6 +468,12 @@ impl IoTask { fn num_bytes(&self) -> u64 { self.to_read.end - self.to_read.start } + fn cancel(self) { + (self.when_done)(Err(Error::Internal { + message: "Scheduler closed before I/O was completed".to_string(), + location: location!(), + })); + } async fn run(self) { let file_path = self.reader.path().as_ref(); @@ -579,7 +583,11 @@ impl ScanStats { /// An I/O scheduler which wraps an ObjectStore and throttles the amount of /// parallel I/O that can be run. /// -/// TODO: This will also add coalescing +/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped. +/// For this reason it should be kept alive until all I/O has finished. +/// +/// Note: The 2.X file readers already do this so this is only a concern if you are +/// using the ScanScheduler directly. pub struct ScanScheduler { object_store: Arc, io_queue: Arc, @@ -639,13 +647,16 @@ impl ScanScheduler { io_capacity as u32, config.io_buffer_size_bytes, )); - let scheduler = Self { + let slf = Arc::new(Self { object_store, io_queue: io_queue.clone(), stats: Arc::new(StatsCollector::new()), - }; + }); + // Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is + // dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task + // to wait for it to finish but that doesn't seem helpful. tokio::task::spawn(async move { run_io_loop(io_queue).await }); - Arc::new(scheduler) + slf } /// Open a file for reading @@ -769,6 +780,17 @@ impl ScanScheduler { impl Drop for ScanScheduler { fn drop(&mut self) { + // If the user is dropping the ScanScheduler then they _should_ be done with I/O. This can happen + // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found + // the data they wanted (limit after filter or some other example). + // + // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop. However, + // it will not terminate the I/O loop itself. This is to help prevent deadlock and ensure that all I/O + // requests that are submitted will terminate. + // + // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they + // drop the scheduler. In practice, this can be difficult to do, and it is better to spend a little bit + // of time letting the I/O loop drain so that we can avoid any potential deadlocks. self.io_queue.close(); } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 3e53c83545c..a603de167ea 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2056,6 +2056,10 @@ impl Scanner { read_options = read_options.with_deleted_rows()?; } + if let Some(io_buffer_size_bytes) = self.io_buffer_size { + read_options = read_options.with_io_buffer_size(io_buffer_size_bytes); + } + let index_input = filter_plan.index_query.clone().map(|index_query| { Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query)) as Arc @@ -8376,4 +8380,53 @@ mod test { ); } } + + #[test_log::test(test)] + fn test_scan_finishes_all_tasks() { + // Need to use multi-threaded runtime otherwise tasks don't run unless someone is polling somewhere + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async move { + let ds = lance_datagen::gen_batch() + .col("id", lance_datagen::array::step::()) + .into_ram_dataset(FragmentCount::from(1000), FragmentRowCount::from(10)) + .await + .unwrap(); + + // This scan with has a small I/O buffer size and batch size to mimic a real-world situation + // that required a lot of data. Many fragments will be scheduled at low priority and the data + // buffer will fill up with data reads. When the scan is abandoned, the tasks to read the fragment + // metadata were left behind and would never finish because the data was never decoded to drain the + // backpressure queue. + // + // The fix (that this test verifies) is to ensure we close the I/O scheduler when the scan is abandoned. + let mut stream = ds + .scan() + .fragment_readahead(1000) + .batch_size(1) + .io_buffer_size(1) + .batch_readahead(1) + .try_into_stream() + .await + .unwrap(); + stream.next().await.unwrap().unwrap(); + }); + + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(10) { + if runtime.handle().metrics().num_alive_tasks() == 0 { + break; + } + std::thread::sleep(Duration::from_millis(100)); + } + + assert!( + runtime.handle().metrics().num_alive_tasks() == 0, + "Tasks should have finished within 10 seconds but there are still {} tasks running", + runtime.handle().metrics().num_alive_tasks() + ); + } } diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 52763dd6718..fc18e33d1df 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -11,6 +11,7 @@ use arrow::array::AsArray; use arrow::datatypes::UInt32Type; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; +use datafusion::common::runtime::SpawnedTask; use datafusion::common::stats::Precision; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -365,6 +366,13 @@ impl FilteredReadStream { .clone() .unwrap_or_else(|| dataset.fragments().clone()); + log::debug!( + "Filtered read on {} fragments with frag_readahead={} and io_parallelism={}", + fragments.len(), + fragment_readahead, + io_parallelism + ); + // Ideally we don't need to collect here but if we don't we get "implementation of FnOnce is // not general enough" false positives from rustc let frag_futs = fragments @@ -386,7 +394,13 @@ impl FilteredReadStream { let output_schema = Arc::new(options.projection.to_arrow_schema()); let obj_store = dataset.object_store.clone(); - let scheduler_config = SchedulerConfig::max_bandwidth(obj_store.as_ref()); + let scheduler_config = if let Some(io_buffer_size_bytes) = options.io_buffer_size_bytes { + SchedulerConfig { + io_buffer_size_bytes, + } + } else { + SchedulerConfig::max_bandwidth(obj_store.as_ref()) + }; let scan_scheduler = ScanScheduler::new(obj_store, scheduler_config); let (scoped_fragments, scan_planned_with_limit_pushed_down) = Self::plan_scan( @@ -412,7 +426,7 @@ impl FilteredReadStream { move |scoped_fragment| { let metrics = global_metrics_clone.clone(); let limit = scan_range_after_filter.as_ref().map(|r| r.end); - tokio::task::spawn( + SpawnedTask::spawn( Self::read_fragment(scoped_fragment, metrics, limit).in_current_span(), ) .map(|thread_result| thread_result.unwrap()) @@ -1197,6 +1211,8 @@ pub struct FilteredReadOptions { pub full_filter: Option, /// The threading mode to use for the scan pub threading_mode: FilteredReadThreadingMode, + /// The size of the I/O buffer to use for the scan + pub io_buffer_size_bytes: Option, } impl FilteredReadOptions { @@ -1223,6 +1239,7 @@ impl FilteredReadOptions { projection, refine_filter: None, full_filter: None, + io_buffer_size_bytes: None, threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads( get_num_compute_intensive_cpus(), ), @@ -1365,6 +1382,14 @@ impl FilteredReadOptions { self.projection = projection; self } + + /// Specify the size of the I/O buffer (in bytes) to use for the scan + /// + /// See [`crate::dataset::scanner::Scanner::io_buffer_size`] for more details. + pub fn with_io_buffer_size(mut self, io_buffer_size: u64) -> Self { + self.io_buffer_size_bytes = Some(io_buffer_size); + self + } } /// A plan node that reads a dataset, applying an optional filter and projection. diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6af1158fe26..ea9c86d58f8 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -350,6 +350,17 @@ impl DatagenExt for BatchGeneratorBuilder { rows_per_fragment: FragmentRowCount, write_params: Option, ) -> lance_core::Result { + // Need to verify that max_rows_per_file has been set otherwise the frag_count won't be respected + if let Some(write_params) = &write_params { + if write_params.max_rows_per_file != rows_per_fragment.0 as usize { + panic!( + "Max rows per file in write params does not match rows per fragment: {} != {}", + write_params.max_rows_per_file, rows_per_fragment.0 as usize + ); + } + } else { + panic!("Write params are not set, will not write correct # of fragments"); + } let reader = self.into_reader_rows( RowCount::from(rows_per_fragment.0 as u64), BatchCount::from(frag_count.0),