diff --git a/python/src/file.rs b/python/src/file.rs index 4256d4e74d6..85a5818037e 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -653,12 +653,8 @@ impl LanceFileReader { path: Path, columns: Option>, ) -> PyResult { - let scheduler = ScanScheduler::new( - object_store, - SchedulerConfig { - io_buffer_size_bytes: 2 * 1024 * 1024 * 1024, - }, - ); + let scheduler = + ScanScheduler::new(object_store, SchedulerConfig::new(2 * 1024 * 1024 * 1024)); let file = scheduler .open_file(&path, &CachedFileSize::unknown()) .await diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a3bcfd3c27b..0965a5410ba 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1382,6 +1382,7 @@ impl BatchDecodeStream { } } + #[instrument(level = "debug", skip_all)] async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result { if self.scheduler_exhausted { return Ok(self.rows_scheduled); @@ -1718,6 +1719,7 @@ impl StructuralBatchDecodeStream { } } + #[instrument(level = "debug", skip_all)] async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result { if self.scheduler_exhausted { return Ok(self.rows_scheduled); @@ -1793,15 +1795,7 @@ impl StructuralBatchDecodeStream { let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone(); let task = async move { let next_task = next_task?; - // Real decode work happens inside into_batch, which can block the current - // thread for a long time. By spawning it as a new task, we allow Tokio's - // worker threads to keep making progress. - tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) }) - .await - .map_err(|err| Error::Wrapped { - error: err.into(), - location: location!(), - })? + async move { next_task.into_batch(emitted_batch_size_warning) }.await }; (task, num_rows) }); diff --git a/rust/lance-io/benches/scheduler.rs b/rust/lance-io/benches/scheduler.rs index c3a25405895..bfec3c6268e 100644 --- a/rust/lance-io/benches/scheduler.rs +++ b/rust/lance-io/benches/scheduler.rs @@ -12,7 +12,7 @@ use lance_io::{ use object_store::path::Path; use rand::{seq::SliceRandom, RngCore}; use std::{fmt::Display, process::Command, sync::Arc}; -use tokio::{runtime::Runtime, sync::mpsc}; +use tokio::{runtime::Runtime, sync::mpsc, task::JoinHandle}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; #[cfg(target_os = "linux")] @@ -22,14 +22,15 @@ use pprof::criterion::{Output, PProfProfiler}; struct FullReadParams { io_parallelism: u32, page_size: u64, + use_lite_scheduler: bool, } impl Display for FullReadParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "full_read,parallel={},read_size={}", - self.io_parallelism, self.page_size + "full_read,parallel={},read_size={},use_lite_scheduler={}", + self.io_parallelism, self.page_size, self.use_lite_scheduler ) } } @@ -73,50 +74,60 @@ fn bench_full_read(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); let (obj_store, tmp_file) = runtime.block_on(create_data(DATA_SIZE)); - for io_parallelism in [1, 16, 32, 64] { - for page_size in [4096, 16 * 1024, 1024 * 1024] { - let params = FullReadParams { - io_parallelism, - page_size, - }; - group.bench_with_input(BenchmarkId::from_parameter(params), ¶ms, |b, params| { - b.iter(|| { - let obj_store = obj_store.clone(); - if obj_store.is_local() { - let path_str = format!("/{}", tmp_file); - Command::new("dd") - .arg(format!("of={}", path_str)) - .arg("oflag=nocache") - .arg("conv=notrunc,fdatasync") - .arg("count=0") - .output() - .unwrap(); - } - std::env::set_var("IO_THREADS", io_parallelism.to_string()); - runtime.block_on(async { - let scheduler = - ScanScheduler::new(obj_store, SchedulerConfig::default_for_testing()); - let file_scheduler = scheduler - .open_file(&tmp_file, &CachedFileSize::unknown()) - .await - .unwrap(); - - let (tx, rx) = mpsc::channel(1024); - let drainer = tokio::spawn(drain_task(rx)); - let mut offset = 0; - while offset < DATA_SIZE { - #[allow(clippy::single_range_in_vec_init)] - let req = vec![offset..(offset + params.page_size)]; - let req = file_scheduler.submit_request(req, 0); - tx.send(req).await.unwrap(); - offset += params.page_size; - } - drop(tx); - let bytes_received = drainer.await.unwrap(); - assert_eq!(bytes_received, DATA_SIZE); - }); - }); - }); + for use_lite_scheduler in [false, true] { + for io_parallelism in [1, 16] { + for page_size in [4096, 1024 * 1024] { + let params = FullReadParams { + io_parallelism, + page_size, + use_lite_scheduler, + }; + group.bench_with_input( + BenchmarkId::from_parameter(params), + ¶ms, + |b, params| { + b.iter(|| { + let obj_store = obj_store.clone(); + if obj_store.is_local() { + let path_str = format!("/{}", tmp_file); + Command::new("dd") + .arg(format!("of={}", path_str)) + .arg("oflag=nocache") + .arg("conv=notrunc,fdatasync") + .arg("count=0") + .output() + .unwrap(); + } + std::env::set_var("IO_THREADS", io_parallelism.to_string()); + let mut config = SchedulerConfig::default_for_testing(); + if use_lite_scheduler { + config = config.with_lite_scheduler(); + } + runtime.block_on(async { + let scheduler = ScanScheduler::new(obj_store, config); + let file_scheduler = scheduler + .open_file(&tmp_file, &CachedFileSize::unknown()) + .await + .unwrap(); + + let (tx, rx) = mpsc::channel(1024); + let drainer = tokio::spawn(drain_task(rx)); + let mut offset = 0; + while offset < DATA_SIZE { + #[allow(clippy::single_range_in_vec_init)] + let req = vec![offset..(offset + params.page_size)]; + let req = file_scheduler.submit_request(req, 0); + tx.send(req).await.unwrap(); + offset += params.page_size; + } + drop(tx); + let bytes_received = drainer.await.unwrap(); + assert_eq!(bytes_received, DATA_SIZE); + }); + }); + }, + ); + } } } } @@ -129,18 +140,38 @@ struct RandomReadParams { io_parallelism: u32, item_size: u32, indices: Arc>, + use_lite_scheduler: bool, + noisy_runtime: bool, } impl Display for RandomReadParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "random_read,parallel={},item_size={}", - self.io_parallelism, self.item_size + "random_read,parallel={},item_size={},use_lite_scheduler={},noisy={}", + self.io_parallelism, self.item_size, self.use_lite_scheduler, self.noisy_runtime ) } } +/// Performs approximately 1ms of CPU busy-work +async fn cpu_busy_work() { + loop { + let start = std::time::Instant::now(); + let mut sum = 0u64; + // Busy loop for approximately 1ms + while start.elapsed().as_micros() < 1000 { + for i in 0..1000 { + sum = sum.wrapping_add(i); + sum = sum.wrapping_mul(31); + } + } + // Use sum to prevent optimization + std::hint::black_box(sum); + tokio::task::yield_now().await; + } +} + /// This benchmark creates a file with DATA_SIZE bytes which is then treated as /// a contiguous array of items with width `item_size`. We read a random selection /// of INDICES_PER_ITER items from the array. The selection is chosen randomly but @@ -148,74 +179,108 @@ impl Display for RandomReadParams { fn bench_random_read(c: &mut Criterion) { let mut group = c.benchmark_group("from_elem"); - group.throughput(criterion::Throughput::Elements(INDICES_PER_ITER as u64)); + // Each iteration performs 100 takes + group.throughput(criterion::Throughput::Elements( + (100 * INDICES_PER_ITER) as u64, + )); - let runtime = Runtime::new().unwrap(); - let (obj_store, tmp_file) = runtime.block_on(create_data(DATA_SIZE)); + for noisy_runtime in [false, true] { + for use_lite_scheduler in [false, true] { + for io_parallelism in [1, 16] { + for item_size in [4096, 32 * 1024] { + let runtime = Runtime::new().unwrap(); + let (obj_store, tmp_file) = runtime.block_on(create_data(DATA_SIZE)); - for io_parallelism in [1, 16, 32, 64] { - for item_size in [8, 1024, 4096] { - let num_indices = DATA_SIZE as u32 / item_size; - let mut rng = rand::rng(); - let mut indices = (0..num_indices).collect::>(); - let (shuffled, _) = indices.partial_shuffle(&mut rng, INDICES_PER_ITER); - let mut indices = shuffled.to_vec(); - indices.sort_unstable(); - - let params = RandomReadParams { - io_parallelism, - item_size, - indices: Arc::new(indices), - }; - group.bench_with_input( - BenchmarkId::from_parameter(¶ms), - ¶ms, - |b, params| { - b.iter(|| { - let obj_store = obj_store.clone(); - if obj_store.is_local() { - let path_str = format!("/{}", tmp_file); - Command::new("dd") - .arg(format!("of={}", path_str)) - .arg("oflag=nocache") - .arg("conv=notrunc,fdatasync") - .arg("count=0") - .output() - .unwrap(); - } - std::env::set_var("IO_THREADS", params.io_parallelism.to_string()); - runtime.block_on(async { - let scheduler = ScanScheduler::new( - obj_store, - SchedulerConfig::default_for_testing(), - ); - let file_scheduler = scheduler - .open_file(&tmp_file, &CachedFileSize::unknown()) - .await - .unwrap(); - - let (tx, rx) = mpsc::channel(1024); - let drainer = tokio::spawn(drain_task(rx)); - let mut idx = 0; - while idx < params.indices.len() { - let iops = (idx..(idx + INDICES_PER_BATCH as usize)) - .map(|idx| { - let start = idx as u64 * params.item_size as u64; - let end = start + params.item_size as u64; - start..end - }) - .collect::>(); - idx += INDICES_PER_BATCH as usize; - let req = file_scheduler.submit_request(iops, 0); - tx.send(req).await.unwrap(); - } - drop(tx); - let bytes_received = drainer.await.unwrap(); - assert_eq!(bytes_received, INDICES_PER_ITER as u64 * item_size as u64); - }); - }); - }, - ); + let num_indices = DATA_SIZE as u32 / item_size; + let mut rng = rand::rng(); + let mut indices = (0..num_indices).collect::>(); + let (shuffled, _) = indices.partial_shuffle(&mut rng, INDICES_PER_ITER); + let mut indices = shuffled.to_vec(); + indices.sort_unstable(); + + let params = RandomReadParams { + io_parallelism, + item_size, + indices: Arc::new(indices), + use_lite_scheduler, + noisy_runtime, + }; + group.bench_with_input( + BenchmarkId::from_parameter(¶ms), + ¶ms, + |b, params| { + b.iter(|| { + let obj_store = obj_store.clone(); + if obj_store.is_local() { + let path_str = format!("/{}", tmp_file); + Command::new("dd") + .arg(format!("of={}", path_str)) + .arg("oflag=nocache") + .arg("conv=notrunc,fdatasync") + .arg("count=0") + .output() + .unwrap(); + } + std::env::set_var("IO_THREADS", params.io_parallelism.to_string()); + runtime.block_on(async { + // Spawn background CPU tasks if noisy_runtime is enabled + let mut noise_tasks: Vec> = Vec::new(); + + if params.noisy_runtime { + for _ in 0..12 { + let task = tokio::spawn(cpu_busy_work()); + noise_tasks.push(task); + } + } + + let mut config = SchedulerConfig::default_for_testing(); + if use_lite_scheduler { + config = config.with_lite_scheduler(); + } + let scheduler = ScanScheduler::new(obj_store, config); + let file_scheduler = scheduler + .open_file(&tmp_file, &CachedFileSize::unknown()) + .await + .unwrap(); + + // Perform 100 takes + for _ in 0..100 { + let (tx, rx) = mpsc::channel(1024); + let drainer = tokio::spawn(drain_task(rx)); + let mut idx = 0; + while idx < params.indices.len() { + let iops = (idx..(idx + INDICES_PER_BATCH as usize)) + .map(|idx| { + let start = + idx as u64 * params.item_size as u64; + let end = start + params.item_size as u64; + start..end + }) + .collect::>(); + idx += INDICES_PER_BATCH as usize; + let req = file_scheduler.submit_request(iops, 0); + tx.send(req).await.unwrap(); + } + drop(tx); + let bytes_received = drainer.await.unwrap(); + assert_eq!( + bytes_received, + INDICES_PER_ITER as u64 * item_size as u64 + ); + } + + // Stop background tasks + if params.noisy_runtime { + for task in noise_tasks { + task.abort(); + } + } + }); + }); + }, + ); + } + } } } } diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index a3591940cef..4a2ca236a46 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -22,6 +22,8 @@ use crate::object_store::ObjectStore; use crate::traits::Reader; use crate::utils::CachedFileSize; +mod lite; + // Don't log backpressure warnings until at least this many seconds have passed const BACKPRESSURE_MIN: u64 = 5; // Don't log backpressure warnings more than once / minute @@ -580,6 +582,11 @@ impl ScanStats { } } +enum IoQueueType { + Standard(Arc), + Lite(Arc), +} + /// An I/O scheduler which wraps an ObjectStore and throttles the amount of /// parallel I/O that can be run. /// @@ -590,7 +597,7 @@ impl ScanStats { /// using the ScanScheduler directly. pub struct ScanScheduler { object_store: Arc, - io_queue: Arc, + io_queue: IoQueueType, stats: Arc, } @@ -615,21 +622,36 @@ pub struct SchedulerConfig { /// This controls back pressure. If data is not processed quickly enough then this /// buffer will fill up and the I/O loop will pause until the buffer is drained. pub io_buffer_size_bytes: u64, + /// Whether to use the new lite scheduler + pub use_lite_scheduler: bool, } impl SchedulerConfig { + pub fn new(io_buffer_size_bytes: u64) -> Self { + Self { + io_buffer_size_bytes, + use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(), + } + } + /// Big enough for unit testing pub fn default_for_testing() -> Self { Self { io_buffer_size_bytes: 256 * 1024 * 1024, + use_lite_scheduler: false, } } /// Configuration that should generally maximize bandwidth (not trying to save RAM /// at all). We assume a max page size of 32MiB and then allow 32MiB per I/O thread pub fn max_bandwidth(store: &ObjectStore) -> Self { + Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64) + } + + pub fn with_lite_scheduler(self) -> Self { Self { - io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64, + use_lite_scheduler: true, + ..self } } } @@ -643,20 +665,29 @@ impl ScanScheduler { /// * config - configuration settings for the scheduler pub fn new(object_store: Arc, config: SchedulerConfig) -> Arc { let io_capacity = object_store.io_parallelism(); - let io_queue = Arc::new(IoQueue::new( - io_capacity as u32, - config.io_buffer_size_bytes, - )); - let slf = Arc::new(Self { + let io_queue = if config.use_lite_scheduler { + let io_queue = Arc::new(lite::IoQueue::new( + io_capacity as u64, + config.io_buffer_size_bytes, + )); + IoQueueType::Lite(io_queue) + } else { + let io_queue = Arc::new(IoQueue::new( + io_capacity as u32, + config.io_buffer_size_bytes, + )); + let io_queue_clone = io_queue.clone(); + // Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is + // dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task + // to wait for it to finish but that doesn't seem helpful. + tokio::task::spawn(async move { run_io_loop(io_queue_clone).await }); + IoQueueType::Standard(io_queue) + }; + Arc::new(Self { object_store, - io_queue: io_queue.clone(), + io_queue, stats: Arc::new(StatsCollector::new()), - }); - // Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is - // dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task - // to wait for it to finish but that doesn't seem helpful. - tokio::task::spawn(async move { run_io_loop(io_queue).await }); - slf + }) } /// Open a file for reading @@ -714,6 +745,7 @@ impl ScanScheduler { request: Vec>, tx: oneshot::Sender, priority: u128, + io_queue: &Arc, ) { let num_iops = request.len() as u32; @@ -731,14 +763,14 @@ impl ScanScheduler { for (task_idx, iop) in request.into_iter().enumerate() { let dest = dest.clone(); - let io_queue = self.io_queue.clone(); + let io_queue_clone = io_queue.clone(); let num_bytes = iop.end - iop.start; let task = IoTask { reader: reader.clone(), to_read: iop, priority, when_done: Box::new(move |data| { - io_queue.on_iop_complete(); + io_queue_clone.on_iop_complete(); let mut dest = dest.lock().unwrap(); let chunk = DataChunk { data, @@ -748,31 +780,83 @@ impl ScanScheduler { dest.deliver_data(chunk); }), }; - self.io_queue.push(task); + io_queue.push(task); } } - fn submit_request( + fn submit_request_standard( &self, reader: Arc, request: Vec>, priority: u128, + io_queue: &Arc, ) -> impl Future>> + Send { let (tx, rx) = oneshot::channel::(); - self.do_submit_request(reader, request, tx, priority); + self.do_submit_request(reader, request, tx, priority, io_queue); - let io_queue = self.io_queue.clone(); + let io_queue_clone = io_queue.clone(); rx.map(move |wrapped_rsp| { // Right now, it isn't possible for I/O to be cancelled so a cancel error should // not occur let rsp = wrapped_rsp.unwrap(); - io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs); + io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs); rsp.data }) } + fn submit_request_lite( + &self, + reader: Arc, + request: Vec>, + priority: u128, + io_queue: &Arc, + ) -> impl Future>> + Send { + // It's important that we submit all requests _before_ we await anything + let maybe_tasks = request + .into_iter() + .map(|task| { + let reader = reader.clone(); + let queue = io_queue.clone(); + let run_fn = Box::new(move || { + reader + .get_range(task.start as usize..task.end as usize) + .map_err(Error::from) + .boxed() + }); + queue.submit(task, priority, run_fn) + }) + .collect::>>(); + match maybe_tasks { + Ok(tasks) => async move { + let mut results = Vec::with_capacity(tasks.len()); + for task in tasks { + results.push(task.await?); + } + Ok(results) + } + .boxed(), + Err(e) => async move { Err(e) }.boxed(), + } + } + + pub fn submit_request( + &self, + reader: Arc, + request: Vec>, + priority: u128, + ) -> impl Future>> + Send { + match &self.io_queue { + IoQueueType::Standard(io_queue) => futures::future::Either::Left( + self.submit_request_standard(reader, request, priority, io_queue), + ), + IoQueueType::Lite(io_queue) => futures::future::Either::Right( + self.submit_request_lite(reader, request, priority, io_queue), + ), + } + } + pub fn stats(&self) -> ScanStats { ScanStats::new(self.stats.as_ref()) } @@ -791,7 +875,10 @@ impl Drop for ScanScheduler { // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they // drop the scheduler. In practice, this can be difficult to do, and it is better to spend a little bit // of time letting the I/O loop drain so that we can avoid any potential deadlocks. - self.io_queue.close(); + match &self.io_queue { + IoQueueType::Standard(io_queue) => io_queue.close(), + IoQueueType::Lite(io_queue) => io_queue.close(), + } } } @@ -1150,6 +1237,7 @@ mod tests { let config = SchedulerConfig { io_buffer_size_bytes: 1024 * 1024, + use_lite_scheduler: false, }; let scan_scheduler = ScanScheduler::new(obj_store, config); @@ -1240,6 +1328,7 @@ mod tests { let config = SchedulerConfig { io_buffer_size_bytes: 10, + use_lite_scheduler: false, }; let scan_scheduler = ScanScheduler::new(obj_store.clone(), config); @@ -1314,6 +1403,7 @@ mod tests { // Ensure deadlock prevention timeout can be disabled let config = SchedulerConfig { io_buffer_size_bytes: 10, + use_lite_scheduler: false, }; let scan_scheduler = ScanScheduler::new(obj_store, config); @@ -1330,6 +1420,77 @@ mod tests { assert_eq!(second_fut.await.unwrap().len(), 10); } + /// A Reader that tracks how many times get_range has been called. + #[derive(Debug)] + struct TrackingReader { + get_range_count: Arc, + path: Path, + } + + impl deepsize::DeepSizeOf for TrackingReader { + fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { + 0 + } + } + + impl Reader for TrackingReader { + fn path(&self) -> &Path { + &self.path + } + + fn block_size(&self) -> usize { + 4096 + } + + fn io_parallelism(&self) -> usize { + 1 + } + + fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result> { + Box::pin(async { Ok(1_000_000) }) + } + + fn get_range( + &self, + range: Range, + ) -> futures::future::BoxFuture<'static, object_store::Result> { + self.get_range_count.fetch_add(1, Ordering::Release); + let num_bytes = range.end - range.start; + Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) }) + } + + fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result> { + Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) }) + } + } + + #[tokio::test] + async fn test_lite_scheduler_submits_eagerly() { + let obj_store = Arc::new(ObjectStore::memory()); + let config = SchedulerConfig::default_for_testing().with_lite_scheduler(); + let scheduler = ScanScheduler::new(obj_store, config); + + let get_range_count = Arc::new(AtomicU64::new(0)); + let reader: Arc = Arc::new(TrackingReader { + get_range_count: get_range_count.clone(), + path: Path::parse("test").unwrap(), + }); + + // Submit several requests. The lite scheduler should call get_range + // eagerly during submit (before the returned future is polled). + let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0); + let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10); + let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20); + + // get_range must have been called for all 3 requests already. + assert_eq!(get_range_count.load(Ordering::Acquire), 3); + + // The futures should still resolve with the correct data. + assert_eq!(fut1.await.unwrap()[0].len(), 100); + assert_eq!(fut2.await.unwrap()[0].len(), 100); + assert_eq!(fut3.await.unwrap()[0].len(), 100); + } + #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn stress_backpressure() { // This test ensures that the backpressure mechanism works correctly with @@ -1345,6 +1506,7 @@ mod tests { // Only one request will be allowed in let config = SchedulerConfig { io_buffer_size_bytes: 1, + use_lite_scheduler: false, }; let scan_scheduler = ScanScheduler::new(obj_store.clone(), config); let file_scheduler = scan_scheduler diff --git a/rust/lance-io/src/scheduler/lite.rs b/rust/lance-io/src/scheduler/lite.rs new file mode 100644 index 00000000000..e744cf7aff6 --- /dev/null +++ b/rust/lance-io/src/scheduler/lite.rs @@ -0,0 +1,656 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! A lightweight I/O scheduler primarily intended for use with I/O uring. +//! +//! This scheduler attempts to avoid any kind of task switching whenever possible +//! to minimize context switching overhead. +//! +//! There are a few limitations compared to the standard scheduler: +//! +//! * There is no concurrency limit. The scheduler will allow as many IOPS to run +//! as possible as long as the backpressure throttle is not exceeded. +//! * There is no "babysitting" of IOPS. An I/O task will only be polled when its +//! future is polled. The standard scheduler will `spawn` I/O tasks and so they +//! are always polled by tokio's runtime. This is important for operations like +//! cloud requests where intermittent polling is required to clear out network +//! buffers and keep the TCP connection moving. + +use std::{ + collections::{BinaryHeap, HashMap}, + fmt::Debug, + future::Future, + ops::Range, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, MutexGuard, + }, + task::{Context, Poll, Waker}, + time::Instant, +}; + +use bytes::Bytes; +use lance_core::{Error, Result}; +use snafu::location; + +use super::{BACKPRESSURE_DEBOUNCE, BACKPRESSURE_MIN}; + +type RunFn = Box Pin> + Send>> + Send>; + +/// The state of an I/O task +/// +/// The state machine is as follows: +/// +/// * `Broken` - The task is in an error state and cannot be run, should never happen +/// * `Initial` - The task has been submitted but does not have a backpressure reservation +/// * `Reserved` - The task has a backpressure reservation +/// * `Running` - The task is running and has a future to poll +/// * `Finished` - The task has finished and has a result +enum TaskState { + Broken, + Initial { + idle_waker: Option, + run_fn: RunFn, + }, + Reserved { + idle_waker: Option, + backpressure_reservation: BackpressureReservation, + run_fn: RunFn, + }, + Running { + backpressure_reservation: BackpressureReservation, + inner: Pin> + Send>>, + }, + Finished { + backpressure_reservation: BackpressureReservation, + data: Result, + }, +} + +/// A custom error type that might have a backpressure reservation +/// +/// This is used instead of Lance's standard error type so we can ensure +/// we release the reservation before returning the error. +struct BrokenTaskError { + message: String, + backpressure_reservation: Option, +} + +/// The result type corresponding to BrokenTaskError +type TaskResult = std::result::Result<(), BrokenTaskError>; + +impl BrokenTaskError { + // Create a BrokenTaskError from a task state + // + // This will capture any backpressure reservation the task has and put it into the + // error so we make sure to release it when returning the error. + fn new(task_state: TaskState, message: String) -> Self { + match task_state { + TaskState::Reserved { + backpressure_reservation, + .. + } + | TaskState::Running { + backpressure_reservation, + .. + } + | TaskState::Finished { + backpressure_reservation, + .. + } => Self { + message, + backpressure_reservation: Some(backpressure_reservation), + }, + TaskState::Broken | TaskState::Initial { .. } => Self { + message, + backpressure_reservation: None, + }, + } + } +} + +/// An I/O task represents a single read operation +struct IoTask { + /// The unique identifier of the task (only used for debugging) + id: u64, + /// The number of bytes to read + num_bytes: u64, + /// The priority of the task, lower values are higher priority + priority: u128, + /// The current state of the task + state: TaskState, +} + +impl IoTask { + fn is_reserved(&self) -> bool { + !matches!(self.state, TaskState::Initial { .. }) + } + + fn cancel(&mut self) -> bool { + let was_running = matches!(self.state, TaskState::Running { .. }); + self.state = TaskState::Finished { + backpressure_reservation: BackpressureReservation { + num_bytes: 0, + priority: 0, + }, + data: Err(Error::IO { + source: Box::new(Error::IO { + source: "I/O Task cancelled".to_string().into(), + location: location!(), + }), + location: location!(), + }), + }; + was_running + } + + fn reserve(&mut self, backpressure_reservation: BackpressureReservation) -> TaskResult { + let state = std::mem::replace(&mut self.state, TaskState::Broken); + let TaskState::Initial { idle_waker, run_fn } = state else { + return Err(BrokenTaskError::new( + state, + format!("Task with id {} not in initial state", self.id), + )); + }; + self.state = TaskState::Reserved { + idle_waker, + backpressure_reservation, + run_fn, + }; + Ok(()) + } + + fn start(&mut self) -> TaskResult { + let state = std::mem::replace(&mut self.state, TaskState::Broken); + let TaskState::Reserved { + backpressure_reservation, + idle_waker, + run_fn, + } = state + else { + return Err(BrokenTaskError::new( + state, + format!("Task with id {} not in reserved state", self.id), + )); + }; + let inner = run_fn(); + self.state = TaskState::Running { + backpressure_reservation, + inner, + }; + + // If someone is already waiting for this task let them know it is now running + // so they can poll it + if let Some(idle_waker) = idle_waker { + idle_waker.wake(); + } + Ok(()) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match &mut self.state { + TaskState::Broken => Poll::Ready(()), + TaskState::Initial { idle_waker, .. } | TaskState::Reserved { idle_waker, .. } => { + idle_waker.replace(cx.waker().clone()); + Poll::Pending + } + TaskState::Running { + inner, + backpressure_reservation, + } => match inner.as_mut().poll(cx) { + Poll::Ready(data) => { + self.state = TaskState::Finished { + data, + backpressure_reservation: *backpressure_reservation, + }; + Poll::Ready(()) + } + Poll::Pending => Poll::Pending, + }, + TaskState::Finished { .. } => Poll::Ready(()), + } + } + + fn consume(self) -> Result<(Result, BackpressureReservation)> { + let TaskState::Finished { + data, + backpressure_reservation, + } = self.state + else { + return Err(Error::Internal { + message: format!("Task with id {} not in finished state", self.id), + location: location!(), + }); + }; + Ok((data, backpressure_reservation)) + } +} + +#[derive(Debug, Clone, Copy)] +struct BackpressureReservation { + num_bytes: u64, + priority: u128, +} + +/// A throttle to control how many bytes can be read before we pause to let compute catch up +trait BackpressureThrottle: Send { + fn try_acquire(&mut self, num_bytes: u64, priority: u128) -> Option; + fn release(&mut self, reservation: BackpressureReservation); +} + +// We want to allow requests that have a lower priority than any +// currently in-flight request. This helps avoid potential deadlocks +// related to backpressure. Unfortunately, it is quite expensive to +// keep track of which priorities are in-flight. +// +// TODO: At some point it would be nice if we can optimize this away but +// in_flight should remain relatively small (generally less than 256 items) +// and has not shown itself to be a bottleneck yet. +struct PrioritiesInFlight { + in_flight: Vec, +} + +impl PrioritiesInFlight { + fn new(capacity: u64) -> Self { + Self { + in_flight: Vec::with_capacity(capacity as usize * 2), + } + } + + fn min_in_flight(&self) -> u128 { + self.in_flight.first().copied().unwrap_or(u128::MAX) + } + + fn push(&mut self, prio: u128) { + let pos = match self.in_flight.binary_search(&prio) { + Ok(pos) => pos, + Err(pos) => pos, + }; + self.in_flight.insert(pos, prio); + } + + fn remove(&mut self, prio: u128) { + if let Ok(pos) = self.in_flight.binary_search(&prio) { + self.in_flight.remove(pos); + } + } +} + +struct SimpleBackpressureThrottle { + start: Instant, + last_warn: AtomicU64, + bytes_available: i64, + priorities_in_flight: PrioritiesInFlight, +} + +impl SimpleBackpressureThrottle { + fn new(max_bytes: u64, max_concurrency: u64) -> Self { + if max_bytes > i64::MAX as u64 { + // This is unlikely to ever be an issue + panic!("Max bytes must be less than {}", i64::MAX); + } + Self { + start: Instant::now(), + last_warn: AtomicU64::new(0), + bytes_available: max_bytes as i64, + priorities_in_flight: PrioritiesInFlight::new(max_concurrency), + } + } + + fn warn_if_needed(&self) { + let seconds_elapsed = self.start.elapsed().as_secs(); + let last_warn = self.last_warn.load(Ordering::Acquire); + let since_last_warn = seconds_elapsed - last_warn; + if (last_warn == 0 + && seconds_elapsed > BACKPRESSURE_MIN + && seconds_elapsed < BACKPRESSURE_DEBOUNCE) + || since_last_warn > BACKPRESSURE_DEBOUNCE + { + tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded"); + log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind"); + self.last_warn + .store(seconds_elapsed.max(1), Ordering::Release); + } + } +} + +impl BackpressureThrottle for SimpleBackpressureThrottle { + fn try_acquire(&mut self, num_bytes: u64, priority: u128) -> Option { + if self.bytes_available >= num_bytes as i64 + || self.priorities_in_flight.min_in_flight() >= priority + { + self.bytes_available -= num_bytes as i64; + self.priorities_in_flight.push(priority); + Some(BackpressureReservation { + num_bytes, + priority, + }) + } else { + self.warn_if_needed(); + None + } + } + + fn release(&mut self, reservation: BackpressureReservation) { + self.bytes_available += reservation.num_bytes as i64; + self.priorities_in_flight.remove(reservation.priority); + } +} + +struct TaskEntry { + task_id: u64, + priority: u128, + reserved: bool, +} + +impl Ord for TaskEntry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Prefer reserved tasks over unreserved tasks and then highest priority tasks over lowest + // priority tasks. + // + // This is a max-heap so we sort by reserved in normal order (true > false) and priority + // in reverse order (lowest priority first) + self.reserved + .cmp(&other.reserved) + .then(other.priority.cmp(&self.priority)) + } +} + +impl PartialOrd for TaskEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for TaskEntry { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority + } +} + +impl Eq for TaskEntry {} + +struct IoQueueState { + backpressure_throttle: Box, + pending_tasks: BinaryHeap, + tasks: HashMap, + next_task_id: u64, +} + +impl IoQueueState { + fn new(max_concurrency: u64, max_bytes: u64) -> Self { + Self { + backpressure_throttle: Box::new(SimpleBackpressureThrottle::new( + max_bytes, + max_concurrency, + )), + pending_tasks: BinaryHeap::new(), + tasks: HashMap::new(), + next_task_id: 0, + } + } + + // If a task is in an unexpected state then we need to release any reservations that were made + // before we return an error. + // + // Note: this is perhaps a bit paranoid as a task should never be in an unexpected state. + fn handle_result(&mut self, result: TaskResult) -> Result<()> { + if let Err(error) = result { + if let Some(reservation) = error.backpressure_reservation { + self.backpressure_throttle.release(reservation); + } + Err(Error::Internal { + message: error.message, + location: location!(), + }) + } else { + Ok(()) + } + } +} + +/// A queue of I/O tasks to be shared between the I/O scheduler and the I/O decoder. +/// +/// The queue is protected by two different throttles. The first controls memory backpressure, and +/// will only allow a certain number of bytes to be allocated for reads. This throttle is released +/// as soon as the decoder consumes the bytes (not when the bytes have been fully processed). This +/// throttle is currently scoped to the scheduler and not shared across the process. This will likely +/// change in the future. +/// +/// The second throttle controls how many IOPS can be issued concurrently. This throttle is released +/// as soon as the IOP is finished. This throttle has both a local per-scheduler limit and also a +/// process-wide limit. +/// +/// Note: unlike the standard scheduler, there is no dedicated I/O loop thread. If the decoder is not +/// polling the I/O tasks then nothing else will. This scheduler is currently intended for use with I/O +/// uring where I/O tasks are bunched together and polling one task advances all outstanding I/O. It +/// would not be suitable for cloud storage where each task is an independent HTTP request and needs to +/// be polled individually (though presumably one could use I/O uring for networked cloud storage some +/// day as well) +pub(super) struct IoQueue { + state: Arc>, +} + +impl IoQueue { + pub fn new(max_concurrency: u64, max_bytes: u64) -> Self { + Self { + state: Arc::new(Mutex::new(IoQueueState::new(max_concurrency, max_bytes))), + } + } + + fn push(&self, mut task: IoTask, mut state: MutexGuard) -> Result<()> { + let task_id = task.id; + if let Some(reservation) = state + .backpressure_throttle + .try_acquire(task.num_bytes, task.priority) + { + state.handle_result(task.reserve(reservation))?; + state.handle_result(task.start())?; + state.tasks.insert(task_id, task); + return Ok(()); + } + + state.pending_tasks.push(TaskEntry { + task_id, + priority: task.priority, + reserved: task.is_reserved(), + }); + state.tasks.insert(task_id, task); + Ok(()) + } + + pub(super) fn submit( + self: Arc, + range: Range, + priority: u128, + run_fn: RunFn, + ) -> Result { + log::trace!( + "Submitting I/O task with range {:?}, priority {:?}", + range, + priority + ); + let mut state = self.state.lock().unwrap(); + let task_id = state.next_task_id; + state.next_task_id += 1; + + let task = IoTask { + id: task_id, + num_bytes: range.end - range.start, + priority, + state: TaskState::Initial { + idle_waker: None, + run_fn, + }, + }; + self.push(task, state)?; + Ok(TaskHandle { + task_id, + queue: self, + }) + } + + // When a task completes we should check to see if any other tasks are now runnable + fn on_task_complete(&self, mut state: MutexGuard) -> Result<()> { + let state_ref = &mut *state; + let mut task_result = TaskResult::Ok(()); + while !state_ref.pending_tasks.is_empty() { + // Unwrap safe here since we just checked the queue is not empty + let next_task = state_ref.pending_tasks.peek().unwrap(); + let Some(task) = state_ref.tasks.get_mut(&next_task.task_id) else { + log::warn!("Task with id {} was lost", next_task.task_id); + continue; + }; + if !task.is_reserved() { + let Some(reservation) = state_ref + .backpressure_throttle + .try_acquire(task.num_bytes, task.priority) + else { + break; + }; + if let Err(e) = task.reserve(reservation) { + task_result = Err(e); + break; + } + } + state_ref.pending_tasks.pop(); + if let Err(e) = task.start() { + task_result = Err(e); + break; + } + } + state_ref.handle_result(task_result) + } + + fn poll(&self, task_id: u64, cx: &mut Context<'_>) -> Poll> { + let mut state = self.state.lock().unwrap(); + let Some(task) = state.tasks.get_mut(&task_id) else { + // This should never happen and indicates a bug + return Poll::Ready(Err(Error::Internal { + message: format!("Task with id {} was lost", task_id), + location: location!(), + })); + }; + match task.poll(cx) { + Poll::Ready(_) => { + let task = state.tasks.remove(&task_id).unwrap(); + let (bytes, reservation) = task.consume()?; + state.backpressure_throttle.release(reservation); + // We run on_task_complete even if not newly finished because we released the backpressure reservation + match self.on_task_complete(state) { + Ok(_) => Poll::Ready(bytes), + Err(e) => Poll::Ready(Err(e)), + } + } + Poll::Pending => Poll::Pending, + } + } + + pub(super) fn close(&self) { + let mut state = self.state.lock().unwrap(); + for task in std::mem::take(&mut state.tasks).values_mut() { + task.cancel(); + } + } +} + +pub(super) struct TaskHandle { + task_id: u64, + queue: Arc, +} + +impl Future for TaskHandle { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.queue.poll(self.task_id, cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::oneshot; + + #[tokio::test] + async fn test_priority_ordering() { + // Backpressure budget of 10 bytes: only one 10-byte task runs at a time. + let queue = Arc::new(IoQueue::new(128, 10)); + + // Records the priority of each task when its run_fn is invoked (i.e. when + // the task transitions to Running). + let start_order: Arc>> = Arc::new(Mutex::new(Vec::new())); + + // Helper: builds a RunFn that records `prio` in start_order and then + // waits on the oneshot receiver for its result bytes. + let make_run_fn = + |prio: u128, rx: oneshot::Receiver, order: Arc>>| -> RunFn { + Box::new(move || { + order.lock().unwrap().push(prio); + Box::pin(async move { Ok(rx.await.unwrap()) }) + }) + }; + + // Submit a blocker task (priority 0, 10 bytes). + // It starts immediately because there is enough backpressure budget. + let (blocker_tx, blocker_rx) = oneshot::channel(); + let blocker = queue + .clone() + .submit(0..10, 0, make_run_fn(0, blocker_rx, start_order.clone())) + .unwrap(); + + // Submit four tasks with out-of-order priorities. + // All are queued because the blocker consumed the full budget. + let (tx_30, rx_30) = oneshot::channel(); + let h30 = queue + .clone() + .submit(0..10, 30, make_run_fn(30, rx_30, start_order.clone())) + .unwrap(); + + let (tx_10, rx_10) = oneshot::channel(); + let h10 = queue + .clone() + .submit(0..10, 10, make_run_fn(10, rx_10, start_order.clone())) + .unwrap(); + + let (tx_50, rx_50) = oneshot::channel(); + let h50 = queue + .clone() + .submit(0..10, 50, make_run_fn(50, rx_50, start_order.clone())) + .unwrap(); + + let (tx_20, rx_20) = oneshot::channel(); + let h20 = queue + .clone() + .submit(0..10, 20, make_run_fn(20, rx_20, start_order.clone())) + .unwrap(); + + // Only the blocker has started so far. + assert_eq!(*start_order.lock().unwrap(), vec![0]); + + // Complete the blocker -> frees budget -> starts priority 10 (lowest value = highest priority). + blocker_tx.send(Bytes::from_static(b"x")).unwrap(); + blocker.await.unwrap(); + assert_eq!(*start_order.lock().unwrap(), vec![0, 10]); + + // Complete priority 10 -> starts priority 20. + tx_10.send(Bytes::from_static(b"x")).unwrap(); + h10.await.unwrap(); + assert_eq!(*start_order.lock().unwrap(), vec![0, 10, 20]); + + // Complete priority 20 -> starts priority 30. + tx_20.send(Bytes::from_static(b"x")).unwrap(); + h20.await.unwrap(); + assert_eq!(*start_order.lock().unwrap(), vec![0, 10, 20, 30]); + + // Complete priority 30 -> starts priority 50. + tx_30.send(Bytes::from_static(b"x")).unwrap(); + h30.await.unwrap(); + assert_eq!(*start_order.lock().unwrap(), vec![0, 10, 20, 30, 50]); + + // Complete priority 50 -> no more pending tasks. + tx_50.send(Bytes::from_static(b"x")).unwrap(); + h50.await.unwrap(); + assert_eq!(*start_order.lock().unwrap(), vec![0, 10, 20, 30, 50]); + } +} diff --git a/rust/lance-tools/src/meta.rs b/rust/lance-tools/src/meta.rs index 8e96e74e7f1..057a8506a0b 100644 --- a/rust/lance-tools/src/meta.rs +++ b/rust/lance-tools/src/meta.rs @@ -37,12 +37,8 @@ impl fmt::Display for LanceToolFileMetadata { impl LanceToolFileMetadata { async fn open(source: &String) -> Result { let (object_store, path) = crate::util::get_object_store_and_path(source).await?; - let scan_scheduler = ScanScheduler::new( - object_store, - SchedulerConfig { - io_buffer_size_bytes: 2 * 1024 * 1024 * 1024, - }, - ); + let scan_scheduler = + ScanScheduler::new(object_store, SchedulerConfig::new(2 * 1024 * 1024 * 1024)); let file_scheduler = scan_scheduler .open_file(&path, &CachedFileSize::unknown()) .await?; diff --git a/rust/lance/benches/take.rs b/rust/lance/benches/take.rs index cd48e58baff..3f9fd842bd8 100644 --- a/rust/lance/benches/take.rs +++ b/rust/lance/benches/take.rs @@ -228,9 +228,7 @@ async fn create_file_reader(dataset: &Dataset, file_path: &Path) -> FileReader { // Create file reader v2. let scheduler = ScanScheduler::new( dataset.object_store.clone(), - SchedulerConfig { - io_buffer_size_bytes: 2 * 1024 * 1024 * 1024, - }, + SchedulerConfig::new(2 * 1024 * 1024 * 1024), ); let file = scheduler .open_file(file_path, &CachedFileSize::unknown()) diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0e53d3666ba..c7088c1cf3e 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -408,9 +408,7 @@ impl FilteredReadStream { let obj_store = dataset.object_store.clone(); let scheduler_config = if let Some(io_buffer_size_bytes) = options.io_buffer_size_bytes { - SchedulerConfig { - io_buffer_size_bytes, - } + SchedulerConfig::new(io_buffer_size_bytes) } else { SchedulerConfig::max_bandwidth(obj_store.as_ref()) }; diff --git a/rust/lance/src/io/exec/scan.rs b/rust/lance/src/io/exec/scan.rs index 827d6749ac9..23e7c62b272 100644 --- a/rust/lance/src/io/exec/scan.rs +++ b/rust/lance/src/io/exec/scan.rs @@ -271,9 +271,7 @@ impl LanceStream { let scan_scheduler = ScanScheduler::new( dataset.object_store.clone(), - SchedulerConfig { - io_buffer_size_bytes: config.io_buffer_size, - }, + SchedulerConfig::new(config.io_buffer_size), ); let scan_scheduler_clone = scan_scheduler.clone(); diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index 25ca45e2335..dce82efb602 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -536,6 +536,7 @@ impl ExecutionPlan for TakeExec { let lazy_take_stream = futures::stream::once(async move { let obj_store = dataset.object_store.clone(); let scheduler_config = SchedulerConfig::max_bandwidth(&obj_store); + // unwrap is safe since SchedulerConfig::max_bandwidth is always valid let scan_scheduler = ScanScheduler::new(obj_store, scheduler_config); let take_stream = Arc::new(TakeStream::new(