Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,8 @@ impl LanceFileReader {
path: Path,
columns: Option<Vec<String>>,
) -> PyResult<Self> {
let scheduler = ScanScheduler::new(
object_store,
SchedulerConfig {
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
},
);
let scheduler =
ScanScheduler::new(object_store, SchedulerConfig::new(2 * 1024 * 1024 * 1024));
let file = scheduler
.open_file(&path, &CachedFileSize::unknown())
.await
Expand Down
12 changes: 3 additions & 9 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ impl BatchDecodeStream {
}
}

#[instrument(level = "debug", skip_all)]
async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
if self.scheduler_exhausted {
return Ok(self.rows_scheduled);
Expand Down Expand Up @@ -1718,6 +1719,7 @@ impl StructuralBatchDecodeStream {
}
}

#[instrument(level = "debug", skip_all)]
async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
if self.scheduler_exhausted {
return Ok(self.rows_scheduled);
Expand Down Expand Up @@ -1793,15 +1795,7 @@ impl StructuralBatchDecodeStream {
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let task = async move {
let next_task = next_task?;
// Real decode work happens inside into_batch, which can block the current
// thread for a long time. By spawning it as a new task, we allow Tokio's
// worker threads to keep making progress.
tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
.await
.map_err(|err| Error::Wrapped {
error: err.into(),
location: location!(),
})?
async move { next_task.into_batch(emitted_batch_size_warning) }.await
};
(task, num_rows)
});
Expand Down
295 changes: 180 additions & 115 deletions rust/lance-io/benches/scheduler.rs

Large diffs are not rendered by default.

206 changes: 184 additions & 22 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::object_store::ObjectStore;
use crate::traits::Reader;
use crate::utils::CachedFileSize;

mod lite;

// Don't log backpressure warnings until at least this many seconds have passed
const BACKPRESSURE_MIN: u64 = 5;
// Don't log backpressure warnings more than once / minute
Expand Down Expand Up @@ -580,6 +582,11 @@ impl ScanStats {
}
}

enum IoQueueType {
Standard(Arc<IoQueue>),
Lite(Arc<lite::IoQueue>),
}

/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
/// parallel I/O that can be run.
///
Expand All @@ -590,7 +597,7 @@ impl ScanStats {
/// using the ScanScheduler directly.
pub struct ScanScheduler {
object_store: Arc<ObjectStore>,
io_queue: Arc<IoQueue>,
io_queue: IoQueueType,
stats: Arc<StatsCollector>,
}

Expand All @@ -615,21 +622,36 @@ pub struct SchedulerConfig {
/// This controls back pressure. If data is not processed quickly enough then this
/// buffer will fill up and the I/O loop will pause until the buffer is drained.
pub io_buffer_size_bytes: u64,
/// Whether to use the new lite scheduler
pub use_lite_scheduler: bool,
}

impl SchedulerConfig {
pub fn new(io_buffer_size_bytes: u64) -> Self {
Self {
io_buffer_size_bytes,
use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(),
}
}

/// Big enough for unit testing
pub fn default_for_testing() -> Self {
Self {
io_buffer_size_bytes: 256 * 1024 * 1024,
use_lite_scheduler: false,
}
}

/// Configuration that should generally maximize bandwidth (not trying to save RAM
/// at all). We assume a max page size of 32MiB and then allow 32MiB per I/O thread
pub fn max_bandwidth(store: &ObjectStore) -> Self {
Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
}

pub fn with_lite_scheduler(self) -> Self {
Self {
io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
use_lite_scheduler: true,
..self
}
}
}
Expand All @@ -643,20 +665,29 @@ impl ScanScheduler {
/// * config - configuration settings for the scheduler
pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
let io_capacity = object_store.io_parallelism();
let io_queue = Arc::new(IoQueue::new(
io_capacity as u32,
config.io_buffer_size_bytes,
));
let slf = Arc::new(Self {
let io_queue = if config.use_lite_scheduler {
let io_queue = Arc::new(lite::IoQueue::new(
io_capacity as u64,
config.io_buffer_size_bytes,
));
IoQueueType::Lite(io_queue)
} else {
let io_queue = Arc::new(IoQueue::new(
io_capacity as u32,
config.io_buffer_size_bytes,
));
let io_queue_clone = io_queue.clone();
// Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is
// dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task
// to wait for it to finish but that doesn't seem helpful.
tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
IoQueueType::Standard(io_queue)
};
Arc::new(Self {
object_store,
io_queue: io_queue.clone(),
io_queue,
stats: Arc::new(StatsCollector::new()),
});
// Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is
// dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task
// to wait for it to finish but that doesn't seem helpful.
tokio::task::spawn(async move { run_io_loop(io_queue).await });
slf
})
}

/// Open a file for reading
Expand Down Expand Up @@ -714,6 +745,7 @@ impl ScanScheduler {
request: Vec<Range<u64>>,
tx: oneshot::Sender<Response>,
priority: u128,
io_queue: &Arc<IoQueue>,
) {
let num_iops = request.len() as u32;

Expand All @@ -731,14 +763,14 @@ impl ScanScheduler {

for (task_idx, iop) in request.into_iter().enumerate() {
let dest = dest.clone();
let io_queue = self.io_queue.clone();
let io_queue_clone = io_queue.clone();
let num_bytes = iop.end - iop.start;
let task = IoTask {
reader: reader.clone(),
to_read: iop,
priority,
when_done: Box::new(move |data| {
io_queue.on_iop_complete();
io_queue_clone.on_iop_complete();
let mut dest = dest.lock().unwrap();
let chunk = DataChunk {
data,
Expand All @@ -748,31 +780,83 @@ impl ScanScheduler {
dest.deliver_data(chunk);
}),
};
self.io_queue.push(task);
io_queue.push(task);
}
}

fn submit_request(
fn submit_request_standard(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
io_queue: &Arc<IoQueue>,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
let (tx, rx) = oneshot::channel::<Response>();

self.do_submit_request(reader, request, tx, priority);
self.do_submit_request(reader, request, tx, priority, io_queue);

let io_queue = self.io_queue.clone();
let io_queue_clone = io_queue.clone();

rx.map(move |wrapped_rsp| {
// Right now, it isn't possible for I/O to be cancelled so a cancel error should
// not occur
let rsp = wrapped_rsp.unwrap();
io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
rsp.data
})
}

fn submit_request_lite(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
io_queue: &Arc<lite::IoQueue>,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
// It's important that we submit all requests _before_ we await anything
let maybe_tasks = request
.into_iter()
.map(|task| {
let reader = reader.clone();
let queue = io_queue.clone();
let run_fn = Box::new(move || {
reader
.get_range(task.start as usize..task.end as usize)
.map_err(Error::from)
.boxed()
});
queue.submit(task, priority, run_fn)
})
.collect::<Result<Vec<_>>>();
match maybe_tasks {
Ok(tasks) => async move {
let mut results = Vec::with_capacity(tasks.len());
for task in tasks {
results.push(task.await?);
}
Ok(results)
}
.boxed(),
Err(e) => async move { Err(e) }.boxed(),
}
}

pub fn submit_request(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
match &self.io_queue {
IoQueueType::Standard(io_queue) => futures::future::Either::Left(
self.submit_request_standard(reader, request, priority, io_queue),
),
IoQueueType::Lite(io_queue) => futures::future::Either::Right(
self.submit_request_lite(reader, request, priority, io_queue),
),
}
}

pub fn stats(&self) -> ScanStats {
ScanStats::new(self.stats.as_ref())
}
Expand All @@ -791,7 +875,10 @@ impl Drop for ScanScheduler {
// In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
// drop the scheduler. In practice, this can be difficult to do, and it is better to spend a little bit
// of time letting the I/O loop drain so that we can avoid any potential deadlocks.
self.io_queue.close();
match &self.io_queue {
IoQueueType::Standard(io_queue) => io_queue.close(),
IoQueueType::Lite(io_queue) => io_queue.close(),
}
}
}

Expand Down Expand Up @@ -1150,6 +1237,7 @@ mod tests {

let config = SchedulerConfig {
io_buffer_size_bytes: 1024 * 1024,
use_lite_scheduler: false,
};

let scan_scheduler = ScanScheduler::new(obj_store, config);
Expand Down Expand Up @@ -1240,6 +1328,7 @@ mod tests {

let config = SchedulerConfig {
io_buffer_size_bytes: 10,
use_lite_scheduler: false,
};

let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
Expand Down Expand Up @@ -1314,6 +1403,7 @@ mod tests {
// Ensure deadlock prevention timeout can be disabled
let config = SchedulerConfig {
io_buffer_size_bytes: 10,
use_lite_scheduler: false,
};

let scan_scheduler = ScanScheduler::new(obj_store, config);
Expand All @@ -1330,6 +1420,77 @@ mod tests {
assert_eq!(second_fut.await.unwrap().len(), 10);
}

/// A Reader that tracks how many times get_range has been called.
#[derive(Debug)]
struct TrackingReader {
get_range_count: Arc<AtomicU64>,
path: Path,
}

impl deepsize::DeepSizeOf for TrackingReader {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
0
}
}

impl Reader for TrackingReader {
fn path(&self) -> &Path {
&self.path
}

fn block_size(&self) -> usize {
4096
}

fn io_parallelism(&self) -> usize {
1
}

fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
Box::pin(async { Ok(1_000_000) })
}

fn get_range(
&self,
range: Range<usize>,
) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
self.get_range_count.fetch_add(1, Ordering::Release);
let num_bytes = range.end - range.start;
Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
}

fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
}
}

#[tokio::test]
async fn test_lite_scheduler_submits_eagerly() {
let obj_store = Arc::new(ObjectStore::memory());
let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
let scheduler = ScanScheduler::new(obj_store, config);

let get_range_count = Arc::new(AtomicU64::new(0));
let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
get_range_count: get_range_count.clone(),
path: Path::parse("test").unwrap(),
});

// Submit several requests. The lite scheduler should call get_range
// eagerly during submit (before the returned future is polled).
let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0);
let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10);
let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20);

// get_range must have been called for all 3 requests already.
assert_eq!(get_range_count.load(Ordering::Acquire), 3);

// The futures should still resolve with the correct data.
assert_eq!(fut1.await.unwrap()[0].len(), 100);
assert_eq!(fut2.await.unwrap()[0].len(), 100);
assert_eq!(fut3.await.unwrap()[0].len(), 100);
}

#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn stress_backpressure() {
// This test ensures that the backpressure mechanism works correctly with
Expand All @@ -1345,6 +1506,7 @@ mod tests {
// Only one request will be allowed in
let config = SchedulerConfig {
io_buffer_size_bytes: 1,
use_lite_scheduler: false,
};
let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
let file_scheduler = scan_scheduler
Expand Down
Loading
Loading