Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::cache::LanceCache;
use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
use lance_core::utils::futures::FinallyStreamExt;
use log::{debug, trace, warn};
use snafu::location;
use tokio::sync::mpsc::error::SendError;
Expand Down Expand Up @@ -1977,6 +1978,8 @@ pub fn schedule_and_decode(
// trying to read them has caused bugs in the past.
let requested_rows = requested_rows.trim_empty_ranges();

let io = config.io.clone();

// For convenience we really want this method to be a snchronous method where all
// errors happen on the stream. There is some async initialization that must happen
// when creating a scheduler. We wrap that all up in the very first task.
Expand All @@ -1988,8 +1991,10 @@ pub fn schedule_and_decode(
target_schema,
config,
) {
// Keep the io alive until the stream is dropped or finishes. Otherwise the
// I/O drops as soon as the scheduling is finished and the I/O loop terminates.
Ok(stream) => stream.finally(move || drop(io)).boxed(),
// If the initialization failed make it look like a failed task
Ok(stream) => stream,
Err(e) => stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(e)).boxed(),
Expand Down
44 changes: 33 additions & 11 deletions rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ impl PrioritiesInFlight {
fn remove(&mut self, prio: u128) {
if let Ok(pos) = self.in_flight.binary_search(&prio) {
self.in_flight.remove(pos);
} else {
unreachable!();
}
}
}
Expand Down Expand Up @@ -217,10 +215,6 @@ impl IoQueueState {
}
}

fn finished(&self) -> bool {
self.done_scheduling && self.pending_requests.is_empty()
}

fn warn_if_needed(&self) {
let seconds_elapsed = self.start.elapsed().as_secs();
let last_warn = self.last_warn.load(Ordering::Acquire);
Expand Down Expand Up @@ -320,7 +314,7 @@ impl IoQueue {
return Some(task);
}

if state.finished() {
if state.done_scheduling {
return None;
}
}
Expand Down Expand Up @@ -351,7 +345,11 @@ impl IoQueue {
fn close(&self) {
let mut state = self.state.lock().unwrap();
state.done_scheduling = true;
let pending_requests = std::mem::take(&mut state.pending_requests);
drop(state);
for request in pending_requests {
request.cancel();
}

self.notify.notify_one();
}
Expand Down Expand Up @@ -470,6 +468,12 @@ impl IoTask {
fn num_bytes(&self) -> u64 {
self.to_read.end - self.to_read.start
}
fn cancel(self) {
(self.when_done)(Err(Error::Internal {
message: "Scheduler closed before I/O was completed".to_string(),
location: location!(),
}));
}

async fn run(self) {
let file_path = self.reader.path().as_ref();
Expand Down Expand Up @@ -579,7 +583,11 @@ impl ScanStats {
/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
/// parallel I/O that can be run.
///
/// TODO: This will also add coalescing
/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped.
/// For this reason it should be kept alive until all I/O has finished.
///
/// Note: The 2.X file readers already do this so this is only a concern if you are
/// using the ScanScheduler directly.
pub struct ScanScheduler {
object_store: Arc<ObjectStore>,
io_queue: Arc<IoQueue>,
Expand Down Expand Up @@ -639,13 +647,16 @@ impl ScanScheduler {
io_capacity as u32,
config.io_buffer_size_bytes,
));
let scheduler = Self {
let slf = Arc::new(Self {
object_store,
io_queue: io_queue.clone(),
stats: Arc::new(StatsCollector::new()),
};
});
// Best we can do here is fire and forget. If the I/O loop is still running when the scheduler is
// dropped we can't wait for it to finish or we'd block a tokio thread. We could spawn a blocking task
// to wait for it to finish but that doesn't seem helpful.
tokio::task::spawn(async move { run_io_loop(io_queue).await });
Arc::new(scheduler)
slf
}

/// Open a file for reading
Expand Down Expand Up @@ -769,6 +780,17 @@ impl ScanScheduler {

impl Drop for ScanScheduler {
fn drop(&mut self) {
// If the user is dropping the ScanScheduler then they _should_ be done with I/O. This can happen
// even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found
// the data they wanted (limit after filter or some other example).
//
// Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop. However,
// it will not terminate the I/O loop itself. This is to help prevent deadlock and ensure that all I/O
// requests that are submitted will terminate.
//
// In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
// drop the scheduler. In practice, this can be difficult to do, and it is better to spend a little bit
// of time letting the I/O loop drain so that we can avoid any potential deadlocks.
self.io_queue.close();
}
}
Expand Down
53 changes: 53 additions & 0 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,10 @@ impl Scanner {
read_options = read_options.with_deleted_rows()?;
}

if let Some(io_buffer_size_bytes) = self.io_buffer_size {
read_options = read_options.with_io_buffer_size(io_buffer_size_bytes);
}

let index_input = filter_plan.index_query.clone().map(|index_query| {
Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query))
as Arc<dyn ExecutionPlan>
Expand Down Expand Up @@ -8376,4 +8380,53 @@ mod test {
);
}
}

#[test_log::test(test)]
fn test_scan_finishes_all_tasks() {
// Need to use multi-threaded runtime otherwise tasks don't run unless someone is polling somewhere
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.unwrap();

runtime.block_on(async move {
let ds = lance_datagen::gen_batch()
.col("id", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(1000), FragmentRowCount::from(10))
.await
.unwrap();

// This scan with has a small I/O buffer size and batch size to mimic a real-world situation
// that required a lot of data. Many fragments will be scheduled at low priority and the data
// buffer will fill up with data reads. When the scan is abandoned, the tasks to read the fragment
// metadata were left behind and would never finish because the data was never decoded to drain the
// backpressure queue.
//
// The fix (that this test verifies) is to ensure we close the I/O scheduler when the scan is abandoned.
let mut stream = ds
.scan()
.fragment_readahead(1000)
.batch_size(1)
.io_buffer_size(1)
.batch_readahead(1)
.try_into_stream()
.await
.unwrap();
stream.next().await.unwrap().unwrap();
});

let start = Instant::now();
while start.elapsed() < Duration::from_secs(10) {
if runtime.handle().metrics().num_alive_tasks() == 0 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}

assert!(
runtime.handle().metrics().num_alive_tasks() == 0,
"Tasks should have finished within 10 seconds but there are still {} tasks running",
runtime.handle().metrics().num_alive_tasks()
Comment on lines +8418 to +8429
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Badge Unstable tokio metrics API breaks build

The new test uses runtime.handle().metrics().num_alive_tasks() to wait for tasks to finish. Handle::metrics is gated behind the tokio_unstable configuration flag, and the workspace Cargo.toml only enables rt-multi-thread, macros, fs, and sync for tokio. Because tokio_unstable is not enabled, this code will not compile, causing the entire test suite to fail to build. The metrics-based polling should either be conditional on tokio_unstable or replaced with a stable mechanism for detecting task completion.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some metrics are unstable. The ones in use here are stable.

);
}
}
29 changes: 27 additions & 2 deletions rust/lance/src/io/exec/filtered_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow::array::AsArray;
use arrow::datatypes::UInt32Type;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::common::runtime::SpawnedTask;
use datafusion::common::stats::Precision;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -365,6 +366,13 @@ impl FilteredReadStream {
.clone()
.unwrap_or_else(|| dataset.fragments().clone());

log::debug!(
"Filtered read on {} fragments with frag_readahead={} and io_parallelism={}",
fragments.len(),
fragment_readahead,
io_parallelism
);

// Ideally we don't need to collect here but if we don't we get "implementation of FnOnce is
// not general enough" false positives from rustc
let frag_futs = fragments
Expand All @@ -386,7 +394,13 @@ impl FilteredReadStream {
let output_schema = Arc::new(options.projection.to_arrow_schema());

let obj_store = dataset.object_store.clone();
let scheduler_config = SchedulerConfig::max_bandwidth(obj_store.as_ref());
let scheduler_config = if let Some(io_buffer_size_bytes) = options.io_buffer_size_bytes {
SchedulerConfig {
io_buffer_size_bytes,
}
} else {
SchedulerConfig::max_bandwidth(obj_store.as_ref())
};
let scan_scheduler = ScanScheduler::new(obj_store, scheduler_config);

let (scoped_fragments, scan_planned_with_limit_pushed_down) = Self::plan_scan(
Expand All @@ -412,7 +426,7 @@ impl FilteredReadStream {
move |scoped_fragment| {
let metrics = global_metrics_clone.clone();
let limit = scan_range_after_filter.as_ref().map(|r| r.end);
tokio::task::spawn(
SpawnedTask::spawn(
Self::read_fragment(scoped_fragment, metrics, limit).in_current_span(),
)
.map(|thread_result| thread_result.unwrap())
Expand Down Expand Up @@ -1197,6 +1211,8 @@ pub struct FilteredReadOptions {
pub full_filter: Option<Expr>,
/// The threading mode to use for the scan
pub threading_mode: FilteredReadThreadingMode,
/// The size of the I/O buffer to use for the scan
pub io_buffer_size_bytes: Option<u64>,
}

impl FilteredReadOptions {
Expand All @@ -1223,6 +1239,7 @@ impl FilteredReadOptions {
projection,
refine_filter: None,
full_filter: None,
io_buffer_size_bytes: None,
threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads(
get_num_compute_intensive_cpus(),
),
Expand Down Expand Up @@ -1365,6 +1382,14 @@ impl FilteredReadOptions {
self.projection = projection;
self
}

/// Specify the size of the I/O buffer (in bytes) to use for the scan
///
/// See [`crate::dataset::scanner::Scanner::io_buffer_size`] for more details.
pub fn with_io_buffer_size(mut self, io_buffer_size: u64) -> Self {
self.io_buffer_size_bytes = Some(io_buffer_size);
self
}
}

/// A plan node that reads a dataset, applying an optional filter and projection.
Expand Down
11 changes: 11 additions & 0 deletions rust/lance/src/utils/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ impl DatagenExt for BatchGeneratorBuilder {
rows_per_fragment: FragmentRowCount,
write_params: Option<WriteParams>,
) -> lance_core::Result<Dataset> {
// Need to verify that max_rows_per_file has been set otherwise the frag_count won't be respected
if let Some(write_params) = &write_params {
if write_params.max_rows_per_file != rows_per_fragment.0 as usize {
panic!(
"Max rows per file in write params does not match rows per fragment: {} != {}",
write_params.max_rows_per_file, rows_per_fragment.0 as usize
);
}
} else {
panic!("Write params are not set, will not write correct # of fragments");
}
let reader = self.into_reader_rows(
RowCount::from(rows_per_fragment.0 as u64),
BatchCount::from(frag_count.0),
Expand Down
Loading