From 3090d4ed3c560009ccbd3ba777f6617338b10787 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Thu, 29 Jan 2026 14:20:41 -0800 Subject: [PATCH 1/4] feat: add FilteredReadPlan with bitmap-based row selection - Add FilteredReadPlan struct using RowAddrTreeMap for row selection - Add get_or_create_plan API for lazy plan computation via OnceCell - Support providing pre-computed plan to FilteredReadExec::try_new - Centralize plan creation in get_or_create_plan_impl - Make RowAddrSelection public in lance-core --- rust/lance-core/src/utils/mask.rs | 7 +- rust/lance/src/dataset/scanner.rs | 2 + rust/lance/src/io/exec/filtered_read.rs | 395 ++++++++++++++++++++---- 3 files changed, 335 insertions(+), 69 deletions(-) diff --git a/rust/lance-core/src/utils/mask.rs b/rust/lance-core/src/utils/mask.rs index a1f56d48a84..2580d6dc7c0 100644 --- a/rust/lance-core/src/utils/mask.rs +++ b/rust/lance-core/src/utils/mask.rs @@ -296,7 +296,7 @@ pub struct RowAddrTreeMap { } #[derive(Clone, Debug, PartialEq)] -enum RowAddrSelection { +pub enum RowAddrSelection { Full, Partial(RoaringBitmap), } @@ -557,6 +557,11 @@ impl RowAddrTreeMap { } } + /// Get the selection for a fragment + pub fn get(&self, fragment_id: &u32) -> Option<&RowAddrSelection> { + self.inner.get(fragment_id) + } + pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator) { let frag_id_set = frag_ids.into_iter().collect::>(); self.inner diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a0812d6caf4..ffca68c52d2 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2326,6 +2326,7 @@ impl Scanner { self.dataset.clone(), read_options, index_input, + None, )?)) } @@ -2406,6 +2407,7 @@ impl Scanner { self.dataset.clone(), FilteredReadOptions::new(projection), Some(input), + None, )?)) } diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0a32da7813a..48f6b0336ad 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -32,7 +32,7 @@ use lance_arrow::RecordBatchExt; use lance_core::datatypes::OnMissing; use lance_core::utils::deletion::DeletionVector; use lance_core::utils::futures::FinallyStreamExt; -use lance_core::utils::mask::RowAddrMask; +use lance_core::utils::mask::{RowAddrMask, RowAddrSelection, RowAddrTreeMap}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::Projection, Error, Result}; use lance_datafusion::planner::Planner; @@ -47,7 +47,7 @@ use lance_table::rowids::RowIdSequence; use lance_table::utils::stream::ReadBatchFut; use roaring::RoaringBitmap; use snafu::location; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::{Mutex as AsyncMutex, OnceCell}; use tracing::{instrument, Instrument}; use crate::dataset::fragment::{FileFragment, FragReadConfig}; @@ -139,6 +139,7 @@ impl ScopedFragmentRead { } /// A fragment with all of its metadata loaded +#[derive(Debug, Clone)] struct LoadedFragment { row_id_sequence: Arc, deletion_vector: Option>, @@ -355,12 +356,13 @@ impl std::fmt::Debug for FilteredReadStream { } impl FilteredReadStream { + /// Create a new FilteredReadStream from a pre-computed plan #[instrument(name = "init_filtered_read_stream", skip_all)] async fn try_new( dataset: Arc, options: FilteredReadOptions, metrics: &ExecutionPlanMetricsSet, - evaluated_index: Option>, + plan: FilteredReadPlan, ) -> DataFusionResult { let global_metrics = Arc::new(FilteredReadGlobalMetrics::new(metrics)); @@ -414,20 +416,17 @@ impl FilteredReadStream { }; let scan_scheduler = ScanScheduler::new(obj_store, scheduler_config); - let (scoped_fragments, scan_planned_with_limit_pushed_down) = Self::plan_scan( - dataset.as_ref(), - loaded_fragments, - &evaluated_index, + // Get scan_range_after_filter from the plan + let scan_range_after_filter = plan.scan_range_after_filter.clone(); + + // Convert plan to scoped fragments for I/O + let scoped_fragments = Self::plan_to_scoped_fragments( + &plan, + &loaded_fragments, + &dataset, &options, scan_scheduler.clone(), - ) - .await?; - - let scan_range_after_filter = if !scan_planned_with_limit_pushed_down { - options.scan_range_after_filter - } else { - None - }; + ); let global_metrics_clone = global_metrics.clone(); @@ -500,15 +499,13 @@ impl FilteredReadStream { // If the scan range is not ignoring the filters we can only push it down if: // 1. The index result is an exact match (we know exactly which rows will be in the result) // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) - // Returns: (fragment reads, whether limit was pushed down to fragment ranges) + // Returns: FilteredReadPlan with bitmap-based row selection #[instrument(name = "plan_scan", skip_all)] - async fn plan_scan( - dataset: &Dataset, - fragments: Vec, + fn plan_scan( + fragments: &[LoadedFragment], evaluated_index: &Option>, options: &FilteredReadOptions, - scan_scheduler: Arc, - ) -> Result<(Vec, bool)> { + ) -> FilteredReadPlan { // For pushing down scan_range_after_filter let mut scan_planned_with_limit_pushed_down = false; let mut to_skip = options @@ -583,19 +580,20 @@ impl FilteredReadStream { } } - let mut scoped_fragments = Vec::with_capacity(fragments.len()); - let default_batch_size = options.batch_size.unwrap_or_else(|| { - get_default_batch_size().unwrap_or_else(|| { - std::cmp::max(dataset.object_store().block_size() / 4, BATCH_SIZE_FALLBACK) - }) as u32 - }); - - let projection = Arc::new(options.projection.clone()); + // Convert ranges to FilteredReadPlan with bitmaps + // TODO: Use bitmap for the whole planning phase to avoid this conversion + let mut rows = RowAddrTreeMap::new(); + let mut filters = HashMap::new(); - for (priority, fragment) in fragments.into_iter().enumerate() { + for fragment in fragments.iter() { let fragment_id = fragment.fragment.id() as u32; if let Some(to_read) = fragments_to_read.get(&fragment_id) { if !to_read.is_empty() { + // Convert ranges to bitmap + let bitmap = Self::ranges_to_bitmap(to_read); + rows.insert_bitmap(fragment_id, bitmap); + + // Resolve filter for this fragment let filter = if let Some(evaluated_index) = evaluated_index { if evaluated_index.applicable_fragments.contains(fragment_id) { match &evaluated_index.index_result { @@ -614,34 +612,88 @@ impl FilteredReadStream { options.full_filter.clone() }; + if let Some(f) = filter { + filters.insert(fragment_id, Arc::new(f)); + } + log::trace!( "Planning {} ranges ({} rows) from fragment {} with filter: {:?}", to_read.len(), to_read.iter().map(|r| r.end - r.start).sum::(), - fragment.fragment.id(), - filter + fragment_id, + filters.get(&fragment_id) ); - - scoped_fragments.push(ScopedFragmentRead { - fragment: fragment.fragment.clone(), - ranges: to_read.clone(), - projection: projection.clone(), - with_deleted_rows: options.with_deleted_rows, - batch_size: default_batch_size, - filter, - priority: priority as u32, - scan_scheduler: scan_scheduler.clone(), - }); } else { log::trace!( "Skipping fragment {} because it was outside the scan range", - fragment.fragment.id() + fragment_id ); } } } - Ok((scoped_fragments, scan_planned_with_limit_pushed_down)) + // If scan_range_after_filter was pushed down, don't include it in the plan + let scan_range_after_filter = if scan_planned_with_limit_pushed_down { + None + } else { + options.scan_range_after_filter.clone() + }; + + FilteredReadPlan { + rows, + filters, + scan_range_after_filter, + } + } + + /// Convert FilteredReadPlan to Vec for I/O + fn plan_to_scoped_fragments( + plan: &FilteredReadPlan, + fragments: &[LoadedFragment], + dataset: &Dataset, + options: &FilteredReadOptions, + scan_scheduler: Arc, + ) -> Vec { + let default_batch_size = options.batch_size.unwrap_or_else(|| { + get_default_batch_size().unwrap_or_else(|| { + std::cmp::max(dataset.object_store().block_size() / 4, BATCH_SIZE_FALLBACK) + }) as u32 + }); + let projection = Arc::new(options.projection.clone()); + let mut scoped_fragments = Vec::new(); + + for (priority, fragment) in fragments.iter().enumerate() { + let fragment_id = fragment.fragment.id() as u32; + + // Check if this fragment is in the plan + if let Some(selection) = plan.rows.get(&fragment_id) { + // Convert selection to ranges + let ranges = match selection { + RowAddrSelection::Full => vec![0..fragment.num_physical_rows], + RowAddrSelection::Partial(ref bitmap) => Self::bitmap_to_ranges(bitmap), + }; + + if ranges.is_empty() { + continue; + } + + // Get filter for this fragment (convert Arc back to Expr) + let filter = plan.filters.get(&fragment_id).map(|f| (**f).clone()); + + scoped_fragments.push(ScopedFragmentRead { + fragment: fragment.fragment.clone(), + ranges, + projection: projection.clone(), + with_deleted_rows: options.with_deleted_rows, + batch_size: default_batch_size, + filter, + priority: priority as u32, + scan_scheduler: scan_scheduler.clone(), + }); + } + } + + scoped_fragments } /// Apply index to a fragment and apply skip/take to matched ranges if possible @@ -804,6 +856,30 @@ impl FilteredReadStream { } } + /// Convert ranges to a RoaringBitmap + fn ranges_to_bitmap(ranges: &[Range]) -> RoaringBitmap { + let mut bitmap = RoaringBitmap::new(); + for range in ranges { + bitmap.insert_range(range.start as u32..range.end as u32); + } + bitmap + } + + /// Convert a RoaringBitmap back to ranges + fn bitmap_to_ranges(bitmap: &RoaringBitmap) -> Vec> { + let mut ranges = Vec::new(); + let mut iter = bitmap.iter().peekable(); + while let Some(start) = iter.next() { + let mut end = start + 1; + while iter.peek() == Some(&end) { + iter.next(); + end += 1; + } + ranges.push(start as u64..end as u64); + } + ranges + } + // Given a logical position and bounds, calculate the number of rows to skip and take fn calculate_fetch( position: Range, // position of the fragment in dataset/fragment coordinates @@ -1424,16 +1500,33 @@ pub struct FilteredReadExec { properties: PlanProperties, metrics: ExecutionPlanMetricsSet, index_input: Option>, + // Precomputed plan. You can further split the plan for distributed execution + plan: Arc>, // When execute is first called we will initialize the FilteredReadStream. In order to support // multiple partitions, each partition will share the stream. running_stream: Arc>>, } +/// The materialized plan, result of planning phase +/// This plan can be further split for distributed execution +#[derive(Clone, Debug)] +pub struct FilteredReadPlan { + /// What fragments and physical rows to read + pub rows: RowAddrTreeMap, + /// Filter to apply per fragment + /// fragments not here don't need filtering + pub filters: HashMap>, + /// Scan range after filter may be applied during planning phase based on index result + /// This is leftover range to apply during execution phase + pub scan_range_after_filter: Option>, +} + impl FilteredReadExec { pub fn try_new( dataset: Arc, mut options: FilteredReadOptions, index_input: Option>, + plan: Option, ) -> Result { if options.with_deleted_rows { // Ensure we have the row id column if with_deleted_rows is set @@ -1490,6 +1583,11 @@ impl FilteredReadExec { let metrics = ExecutionPlanMetricsSet::new(); + let plan_cell = Arc::new(OnceCell::new()); + if let Some(p) = plan { + let _ = plan_cell.set(p); + } + Ok(Self { dataset, options, @@ -1497,9 +1595,91 @@ impl FilteredReadExec { running_stream: Arc::new(AsyncMutex::new(None)), metrics, index_input, + plan: plan_cell, }) } + /// Set the plan on this exec (builder pattern) + pub fn with_plan(self, plan: FilteredReadPlan) -> Self { + let plan_cell = Arc::new(OnceCell::new()); + let _ = plan_cell.set(plan); + Self { + plan: plan_cell, + ..self + } + } + + /// Get or create the plan impl + async fn get_or_create_plan_impl<'a>( + plan_cell: &'a OnceCell, + dataset: Arc, + options: &FilteredReadOptions, + index_input: Option<&Arc>, + partition: usize, + ctx: Arc, + ) -> Result<&'a FilteredReadPlan> { + plan_cell + .get_or_try_init(|| async { + // Execute index if present + let mut evaluated_index = None; + if let Some(index_input) = index_input { + let mut index_search = index_input.execute(partition, ctx)?; + let index_search_result = + index_search.next().await.ok_or_else(|| Error::Internal { + message: "Index search did not yield any results".to_string(), + location: location!(), + })??; + evaluated_index = Some(Arc::new(EvaluatedIndex::try_from_arrow( + &index_search_result, + )?)); + } + + // Load fragments to compute the plan + let io_parallelism = dataset.object_store.io_parallelism(); + let fragments = options + .fragments + .clone() + .unwrap_or_else(|| dataset.fragments().clone()); + + let with_deleted_rows = options.with_deleted_rows; + let frag_futs = fragments + .iter() + .map(|frag| { + Result::Ok(FilteredReadStream::load_fragment( + dataset.clone(), + frag.clone(), + with_deleted_rows, + )) + }) + .collect::>(); + let loaded_fragments = futures::stream::iter(frag_futs) + .try_buffered(io_parallelism) + .try_collect::>() + .await?; + + // Plan the scan + Ok(FilteredReadStream::plan_scan( + &loaded_fragments, + &evaluated_index, + options, + )) + }) + .await + } + + /// Get or create the plan (public API) + pub async fn get_or_create_plan(&self, ctx: Arc) -> Result<&FilteredReadPlan> { + Self::get_or_create_plan_impl( + &self.plan, + self.dataset.clone(), + &self.options, + self.index_input.as_ref(), + 0, + ctx, + ) + .await + } + fn obtain_stream( &self, partition: usize, @@ -1515,6 +1695,7 @@ impl FilteredReadExec { let options = self.options.clone(); let metrics = self.metrics.clone(); let index_input = self.index_input.clone(); + let plan_cell = self.plan.clone(); let stream = futures::stream::once(async move { let mut running_stream = running_stream_lock.lock().await; @@ -1523,22 +1704,17 @@ impl FilteredReadExec { running_stream.get_stream(&metrics, partition), ) } else { - let mut evaluated_index = None; - if let Some(index_input) = index_input { - let mut index_search = index_input.execute(partition, context)?; - let index_search_result = - index_search.next().await.ok_or_else(|| Error::Internal { - message: "Index search did not yield any results".to_string(), - location: location!(), - })??; - evaluated_index = Some(Arc::new(EvaluatedIndex::try_from_arrow( - &index_search_result, - )?)); - } - + let plan = Self::get_or_create_plan_impl( + &plan_cell, + dataset.clone(), + &options, + index_input.as_ref(), + partition, + context.clone(), + ) + .await?; let new_running_stream = - FilteredReadStream::try_new(dataset, options, &metrics, evaluated_index) - .await?; + FilteredReadStream::try_new(dataset, options, &metrics, plan.clone()).await?; let first_stream = new_running_stream.get_stream(&metrics, partition); *running_stream = Some(new_running_stream); DataFusionResult::Ok(first_stream) @@ -1707,6 +1883,7 @@ impl ExecutionPlan for FilteredReadExec { ..self.options.clone() }, None, + None, )?); let df_filter_exec = FilterExec::try_new(physical_filter, mock_input)?; let mut df_stats = df_filter_exec.partition_statistics(partition)?; @@ -1773,6 +1950,7 @@ impl ExecutionPlan for FilteredReadExec { // out just in case running_stream: Arc::new(AsyncMutex::new(None)), index_input, + plan: Arc::new(OnceCell::new()), })) } } @@ -1834,6 +2012,7 @@ impl ExecutionPlan for FilteredReadExec { self.dataset.clone(), updated_options, self.index_input.clone(), + None, ) { Ok(exec) => Some(Arc::new(exec)), Err(e) => { @@ -2009,13 +2188,13 @@ mod tests { async fn make_plan(&self, options: FilteredReadOptions) -> FilteredReadExec { let index_input = self.index_input(&options).await; - FilteredReadExec::try_new(self.dataset.clone(), options, index_input).unwrap() + FilteredReadExec::try_new(self.dataset.clone(), options, index_input, None).unwrap() } async fn test_plan(&self, options: FilteredReadOptions, expected: &dyn Array) { let index_input = self.index_input(&options).await; - let plan = - FilteredReadExec::try_new(self.dataset.clone(), options, index_input).unwrap(); + let plan = FilteredReadExec::try_new(self.dataset.clone(), options, index_input, None) + .unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let schema = stream.schema(); @@ -2107,7 +2286,7 @@ mod tests { ); let options = FilteredReadOptions::basic_full_read(&dataset).with_filter_plan(filter_plan); - let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None, None).unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let batches = stream.try_collect::>().await.unwrap(); let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); @@ -2287,7 +2466,7 @@ mod tests { .with_projection(fixture.dataset.empty_projection()); let index_input = fixture.index_input(&options).await; let Err(Error::InvalidInput { source, .. }) = - FilteredReadExec::try_new(fixture.dataset.clone(), options, index_input) + FilteredReadExec::try_new(fixture.dataset.clone(), options, index_input, None) else { panic!("Expected an InvalidInput error when given an empty projection"); }; @@ -2526,7 +2705,7 @@ mod tests { let base_options = FilteredReadOptions::basic_full_read(&dataset); let options = base_options.with_scan_range_before_filter(3..4).unwrap(); - let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None, None).unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let schema = stream.schema(); let batches = stream.try_collect::>().await.unwrap(); @@ -3322,8 +3501,9 @@ mod tests { async fn test_metrics_with_limit_partial_fragment() { let fixture = TestFixture::new().await; let options = FilteredReadOptions::basic_full_read(&fixture.dataset).with_batch_size(10); - let filtered_read = - Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); + let filtered_read = Arc::new( + FilteredReadExec::try_new(fixture.dataset.clone(), options, None, None).unwrap(), + ); let batches = filtered_read .execute(0, Arc::new(TaskContext::default())) @@ -3370,4 +3550,83 @@ mod tests { .unwrap_or(0); assert!(iops > 0, "Should have recorded IO operations"); } + + /// Test that direct execution gives the same result as get_plan + execute_with_plan + #[test_log::test(tokio::test)] + async fn test_plan_round_trip() { + let fixture = TestFixture::new().await; + let ctx = Arc::new(TaskContext::default()); + + // Test with filter + let filter_plan = fixture.filter_plan("fully_indexed = 50", true).await; + let options = FilteredReadOptions::basic_full_read(&fixture.dataset) + .with_filter_plan(filter_plan.clone()); + + // Path 1: Direct execution (no plan provided) + let index_input = fixture.index_input(&options).await; + let exec1 = + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + .unwrap(); + let stream1 = exec1.execute(0, ctx.clone()).unwrap(); + let schema1 = stream1.schema(); + let batches1 = stream1.try_collect::>().await.unwrap(); + let result1 = concat_batches(&schema1, &batches1).unwrap(); + + // Path 2: Get plan first, then create new exec with plan via with_plan + let index_input = fixture.index_input(&options).await; + let exec2 = + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + .unwrap(); + let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); + + // Create new exec and use with_plan to set the plan + let index_input = fixture.index_input(&options).await; + let exec3 = + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + .unwrap() + .with_plan(plan); + let stream3 = exec3.execute(0, ctx.clone()).unwrap(); + let schema3 = stream3.schema(); + let batches3 = stream3.try_collect::>().await.unwrap(); + let result3 = concat_batches(&schema3, &batches3).unwrap(); + + // Results should match + assert_eq!(result1.num_rows(), result3.num_rows()); + assert_eq!(result1.schema(), result3.schema()); + for i in 0..result1.num_columns() { + assert_eq!(result1.column(i).as_ref(), result3.column(i).as_ref()); + } + + // Test with range scan + let options = FilteredReadOptions::basic_full_read(&fixture.dataset) + .with_scan_range_before_filter(10..50) + .unwrap(); + + // Path 1: Direct execution + let exec1 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) + .unwrap(); + let stream1 = exec1.execute(0, ctx.clone()).unwrap(); + let schema1 = stream1.schema(); + let batches1 = stream1.try_collect::>().await.unwrap(); + let result1 = concat_batches(&schema1, &batches1).unwrap(); + + // Path 2: Get plan, then create new exec with_plan + let exec2 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) + .unwrap(); + let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); + + let exec3 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) + .unwrap() + .with_plan(plan); + let stream3 = exec3.execute(0, ctx.clone()).unwrap(); + let schema3 = stream3.schema(); + let batches3 = stream3.try_collect::>().await.unwrap(); + let result3 = concat_batches(&schema3, &batches3).unwrap(); + + // Results should match + assert_eq!(result1.num_rows(), result3.num_rows()); + for i in 0..result1.num_columns() { + assert_eq!(result1.column(i).as_ref(), result3.column(i).as_ref()); + } + } } From 7bb8fdeca43a555b29d7f69cec5f2d028d4528be Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Thu, 29 Jan 2026 17:27:36 -0800 Subject: [PATCH 2/4] refactor: use hybrid internal/external plan for FilteredReadExec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add FilteredReadInternalPlan (private) using BTreeMap>> for efficient local execution without bitmap conversion - Keep FilteredReadPlan (public) using RowAddrTreeMap for distributed execution - Local path: plan_scan() → internal plan → ScopedFragmentRead (zero conversions) - External API: get_or_create_plan() converts internal → external once - with_plan() converts external → internal for distributed workers - Add bitmap_to_ranges() utility in lance-core for efficient bitmap conversion - Use BTreeMap for rows to maintain deterministic fragment order 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance-core/src/utils/mask.rs | 33 ++++ rust/lance/src/dataset/scanner.rs | 2 - rust/lance/src/io/exec/filtered_read.rs | 229 +++++++++++++----------- 3 files changed, 159 insertions(+), 105 deletions(-) diff --git a/rust/lance-core/src/utils/mask.rs b/rust/lance-core/src/utils/mask.rs index 2580d6dc7c0..be4934ac55d 100644 --- a/rust/lance-core/src/utils/mask.rs +++ b/rust/lance-core/src/utils/mask.rs @@ -562,6 +562,11 @@ impl RowAddrTreeMap { self.inner.get(fragment_id) } + /// Iterate over (fragment_id, selection) pairs + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } + pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator) { let frag_id_set = frag_ids.into_iter().collect::>(); self.inner @@ -929,6 +934,34 @@ impl Extend for RowAddrTreeMap { } } +/// Convert a RoaringBitmap to a vector of contiguous ranges. +/// +/// This is more efficient than iterating over individual bits and coalescing, +/// as it builds ranges directly in a single pass. +pub fn bitmap_to_ranges(bitmap: &RoaringBitmap) -> Vec> { + if bitmap.is_empty() { + return vec![]; + } + + let mut ranges = Vec::new(); + let mut iter = bitmap.iter(); + let first = iter.next().unwrap(); + let mut start = first; + let mut end = first; + + for val in iter { + if val == end + 1 { + end = val; + } else { + ranges.push(start as u64..(end + 1) as u64); + start = val; + end = val; + } + } + ranges.push(start as u64..(end + 1) as u64); + ranges +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index ffca68c52d2..a0812d6caf4 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2326,7 +2326,6 @@ impl Scanner { self.dataset.clone(), read_options, index_input, - None, )?)) } @@ -2407,7 +2406,6 @@ impl Scanner { self.dataset.clone(), FilteredReadOptions::new(projection), Some(input), - None, )?)) } diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 48f6b0336ad..2622c4514a1 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors use std::any::Any; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Mutex; @@ -32,7 +32,7 @@ use lance_arrow::RecordBatchExt; use lance_core::datatypes::OnMissing; use lance_core::utils::deletion::DeletionVector; use lance_core::utils::futures::FinallyStreamExt; -use lance_core::utils::mask::{RowAddrMask, RowAddrSelection, RowAddrTreeMap}; +use lance_core::utils::mask::{bitmap_to_ranges, RowAddrMask, RowAddrSelection, RowAddrTreeMap}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::Projection, Error, Result}; use lance_datafusion::planner::Planner; @@ -114,7 +114,7 @@ impl EvaluatedIndex { /// A fragment along with ranges of row offsets to read struct ScopedFragmentRead { - fragment: FileFragment, + fragment: Arc, ranges: Vec>, projection: Arc, with_deleted_rows: bool, @@ -143,7 +143,7 @@ impl ScopedFragmentRead { struct LoadedFragment { row_id_sequence: Arc, deletion_vector: Option>, - fragment: FileFragment, + fragment: Arc, // The number of physical rows in the fragment // // This count includes deleted rows @@ -356,13 +356,13 @@ impl std::fmt::Debug for FilteredReadStream { } impl FilteredReadStream { - /// Create a new FilteredReadStream from a pre-computed plan + /// Create a new FilteredReadStream from a pre-computed internal plan #[instrument(name = "init_filtered_read_stream", skip_all)] async fn try_new( dataset: Arc, options: FilteredReadOptions, metrics: &ExecutionPlanMetricsSet, - plan: FilteredReadPlan, + plan: FilteredReadInternalPlan, ) -> DataFusionResult { let global_metrics = Arc::new(FilteredReadGlobalMetrics::new(metrics)); @@ -482,7 +482,7 @@ impl FilteredReadStream { }; Ok(LoadedFragment { row_id_sequence, - fragment: file_fragment, + fragment: Arc::new(file_fragment), num_physical_rows, num_logical_rows, deletion_vector, @@ -499,13 +499,13 @@ impl FilteredReadStream { // If the scan range is not ignoring the filters we can only push it down if: // 1. The index result is an exact match (we know exactly which rows will be in the result) // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) - // Returns: FilteredReadPlan with bitmap-based row selection + // Returns: FilteredReadInternalPlan with range-based row selection (no bitmap conversion) #[instrument(name = "plan_scan", skip_all)] fn plan_scan( fragments: &[LoadedFragment], evaluated_index: &Option>, options: &FilteredReadOptions, - ) -> FilteredReadPlan { + ) -> FilteredReadInternalPlan { // For pushing down scan_range_after_filter let mut scan_planned_with_limit_pushed_down = false; let mut to_skip = options @@ -520,12 +520,13 @@ impl FilteredReadStream { .unwrap_or(u64::MAX); // Full fragment ranges to read before applying scan_range_after_filter - let mut fragments_to_read: HashMap>> = HashMap::new(); + // Uses BTreeMap to maintain deterministic fragment order for scan_range_after_filter + let mut fragments_to_read: BTreeMap>> = BTreeMap::new(); // Fragment ranges to read after applying scan_range_after_filter // Adds an extra map because if scan_range_after_filter cannot be fulfilled we need to // fall back to read the full fragment in fragments_to_read // Used only when index guarantees enough rows to satisfy scan_range_after_filter - let mut scan_push_down_fragments_to_read: HashMap>> = HashMap::new(); + let mut scan_push_down_fragments_to_read: BTreeMap>> = BTreeMap::new(); // The current offset, includes filtered rows, but not deleted rows let mut range_offset = 0; @@ -580,19 +581,12 @@ impl FilteredReadStream { } } - // Convert ranges to FilteredReadPlan with bitmaps - // TODO: Use bitmap for the whole planning phase to avoid this conversion - let mut rows = RowAddrTreeMap::new(); + // Build filters for each fragment let mut filters = HashMap::new(); - for fragment in fragments.iter() { let fragment_id = fragment.fragment.id() as u32; if let Some(to_read) = fragments_to_read.get(&fragment_id) { if !to_read.is_empty() { - // Convert ranges to bitmap - let bitmap = Self::ranges_to_bitmap(to_read); - rows.insert_bitmap(fragment_id, bitmap); - // Resolve filter for this fragment let filter = if let Some(evaluated_index) = evaluated_index { if evaluated_index.applicable_fragments.contains(fragment_id) { @@ -639,16 +633,18 @@ impl FilteredReadStream { options.scan_range_after_filter.clone() }; - FilteredReadPlan { - rows, + // Return internal plan with ranges (no bitmap conversion) + FilteredReadInternalPlan { + rows: fragments_to_read, filters, scan_range_after_filter, } } - /// Convert FilteredReadPlan to Vec for I/O + /// Convert internal plan (ranges) to Vec for I/O + /// No bitmap conversion needed - ranges are used directly fn plan_to_scoped_fragments( - plan: &FilteredReadPlan, + plan: &FilteredReadInternalPlan, fragments: &[LoadedFragment], dataset: &Dataset, options: &FilteredReadOptions, @@ -666,13 +662,7 @@ impl FilteredReadStream { let fragment_id = fragment.fragment.id() as u32; // Check if this fragment is in the plan - if let Some(selection) = plan.rows.get(&fragment_id) { - // Convert selection to ranges - let ranges = match selection { - RowAddrSelection::Full => vec![0..fragment.num_physical_rows], - RowAddrSelection::Partial(ref bitmap) => Self::bitmap_to_ranges(bitmap), - }; - + if let Some(ranges) = plan.rows.get(&fragment_id) { if ranges.is_empty() { continue; } @@ -682,7 +672,7 @@ impl FilteredReadStream { scoped_fragments.push(ScopedFragmentRead { fragment: fragment.fragment.clone(), - ranges, + ranges: ranges.clone(), projection: projection.clone(), with_deleted_rows: options.with_deleted_rows, batch_size: default_batch_size, @@ -705,8 +695,8 @@ impl FilteredReadStream { to_read: Vec>, to_skip: &mut u64, to_take: &mut u64, - fragments_to_read: &mut HashMap>>, - scan_push_down_fragments_to_read: &mut HashMap>>, + fragments_to_read: &mut BTreeMap>>, + scan_push_down_fragments_to_read: &mut BTreeMap>>, ) { let fragment_id = fragment.id() as u32; @@ -857,29 +847,6 @@ impl FilteredReadStream { } /// Convert ranges to a RoaringBitmap - fn ranges_to_bitmap(ranges: &[Range]) -> RoaringBitmap { - let mut bitmap = RoaringBitmap::new(); - for range in ranges { - bitmap.insert_range(range.start as u32..range.end as u32); - } - bitmap - } - - /// Convert a RoaringBitmap back to ranges - fn bitmap_to_ranges(bitmap: &RoaringBitmap) -> Vec> { - let mut ranges = Vec::new(); - let mut iter = bitmap.iter().peekable(); - while let Some(start) = iter.next() { - let mut end = start + 1; - while iter.peek() == Some(&end) { - iter.next(); - end += 1; - } - ranges.push(start as u64..end as u64); - } - ranges - } - // Given a logical position and bounds, calculate the number of rows to skip and take fn calculate_fetch( position: Range, // position of the fragment in dataset/fragment coordinates @@ -1500,33 +1467,67 @@ pub struct FilteredReadExec { properties: PlanProperties, metrics: ExecutionPlanMetricsSet, index_input: Option>, - // Precomputed plan. You can further split the plan for distributed execution - plan: Arc>, + // Precomputed internal plan + plan: Arc>, // When execute is first called we will initialize the FilteredReadStream. In order to support // multiple partitions, each partition will share the stream. running_stream: Arc>>, } -/// The materialized plan, result of planning phase -/// This plan can be further split for distributed execution -#[derive(Clone, Debug)] +/// Public plan for distributed execution - uses bitmap for flexibility +#[derive(Clone)] pub struct FilteredReadPlan { /// What fragments and physical rows to read pub rows: RowAddrTreeMap, /// Filter to apply per fragment /// fragments not here don't need filtering pub filters: HashMap>, - /// Scan range after filter may be applied during planning phase based on index result - /// This is leftover range to apply during execution phase + /// Row offset range to apply after filtering (skip N rows, take M rows). + /// If the index guarantees enough matching rows, this is pushed down during planning + /// and set to None. Otherwise, it's applied during execution. pub scan_range_after_filter: Option>, } +/// Internal plan representation - uses ranges for efficiency in local execution +/// This avoids expensive range↔bitmap conversion +#[derive(Clone, Debug)] +struct FilteredReadInternalPlan { + /// Fragment ID to ranges to read (BTreeMap for deterministic order with scan_range_after_filter) + rows: BTreeMap>>, + /// Filter to apply per fragment (fragments not here don't need filtering) + filters: HashMap>, + /// Row offset range to apply after filtering (skip N rows, take M rows). + /// If the index guarantees enough matching rows, this is pushed down during planning + /// and set to None. Otherwise, it's applied during execution. + scan_range_after_filter: Option>, +} + +impl FilteredReadInternalPlan { + /// Convert internal plan (ranges) to external plan (bitmap) for distributed execution + fn to_external_plan(&self) -> FilteredReadPlan { + let mut rows = RowAddrTreeMap::new(); + for (fragment_id, ranges) in &self.rows { + if !ranges.is_empty() { + let mut bitmap = RoaringBitmap::new(); + for range in ranges { + bitmap.insert_range(range.start as u32..range.end as u32); + } + rows.insert_bitmap(*fragment_id, bitmap); + } + } + FilteredReadPlan { + rows, + filters: self.filters.clone(), + scan_range_after_filter: self.scan_range_after_filter.clone(), + } + } +} + impl FilteredReadExec { pub fn try_new( dataset: Arc, mut options: FilteredReadOptions, index_input: Option>, - plan: Option, ) -> Result { if options.with_deleted_rows { // Ensure we have the row id column if with_deleted_rows is set @@ -1583,11 +1584,6 @@ impl FilteredReadExec { let metrics = ExecutionPlanMetricsSet::new(); - let plan_cell = Arc::new(OnceCell::new()); - if let Some(p) = plan { - let _ = plan_cell.set(p); - } - Ok(Self { dataset, options, @@ -1595,29 +1591,54 @@ impl FilteredReadExec { running_stream: Arc::new(AsyncMutex::new(None)), metrics, index_input, - plan: plan_cell, + plan: Arc::new(OnceCell::new()), }) } - /// Set the plan on this exec (builder pattern) - pub fn with_plan(self, plan: FilteredReadPlan) -> Self { + /// Set the pre-computed plan for execution + pub async fn with_plan(self, plan: FilteredReadPlan) -> Result { + let mut rows = BTreeMap::new(); + for (fragment_id, selection) in plan.rows.iter() { + let ranges = match selection { + RowAddrSelection::Partial(bitmap) => bitmap_to_ranges(bitmap), + RowAddrSelection::Full => { + let fragment = self + .dataset + .get_fragment(*fragment_id as usize) + .ok_or_else(|| Error::InvalidInput { + source: format!("Fragment {} not found", fragment_id).into(), + location: location!(), + })?; + let num_rows = fragment.physical_rows().await?; + vec![0..num_rows as u64] + } + }; + if !ranges.is_empty() { + rows.insert(*fragment_id, ranges); + } + } + let internal_plan = FilteredReadInternalPlan { + rows, + filters: plan.filters, + scan_range_after_filter: plan.scan_range_after_filter, + }; let plan_cell = Arc::new(OnceCell::new()); - let _ = plan_cell.set(plan); - Self { + let _ = plan_cell.set(internal_plan); + Ok(Self { plan: plan_cell, ..self - } + }) } - /// Get or create the plan impl + /// Get or create the internal plan (ranges, no bitmap conversion) async fn get_or_create_plan_impl<'a>( - plan_cell: &'a OnceCell, + plan_cell: &'a OnceCell, dataset: Arc, options: &FilteredReadOptions, index_input: Option<&Arc>, partition: usize, ctx: Arc, - ) -> Result<&'a FilteredReadPlan> { + ) -> Result<&'a FilteredReadInternalPlan> { plan_cell .get_or_try_init(|| async { // Execute index if present @@ -1667,9 +1688,9 @@ impl FilteredReadExec { .await } - /// Get or create the plan (public API) - pub async fn get_or_create_plan(&self, ctx: Arc) -> Result<&FilteredReadPlan> { - Self::get_or_create_plan_impl( + /// Get the existing plan or create it if it doesn't exist + pub async fn get_or_create_plan(&self, ctx: Arc) -> Result { + let internal_plan = Self::get_or_create_plan_impl( &self.plan, self.dataset.clone(), &self.options, @@ -1677,7 +1698,8 @@ impl FilteredReadExec { 0, ctx, ) - .await + .await?; + Ok(internal_plan.to_external_plan()) } fn obtain_stream( @@ -1883,7 +1905,6 @@ impl ExecutionPlan for FilteredReadExec { ..self.options.clone() }, None, - None, )?); let df_filter_exec = FilterExec::try_new(physical_filter, mock_input)?; let mut df_stats = df_filter_exec.partition_statistics(partition)?; @@ -2012,7 +2033,6 @@ impl ExecutionPlan for FilteredReadExec { self.dataset.clone(), updated_options, self.index_input.clone(), - None, ) { Ok(exec) => Some(Arc::new(exec)), Err(e) => { @@ -2188,13 +2208,13 @@ mod tests { async fn make_plan(&self, options: FilteredReadOptions) -> FilteredReadExec { let index_input = self.index_input(&options).await; - FilteredReadExec::try_new(self.dataset.clone(), options, index_input, None).unwrap() + FilteredReadExec::try_new(self.dataset.clone(), options, index_input).unwrap() } async fn test_plan(&self, options: FilteredReadOptions, expected: &dyn Array) { let index_input = self.index_input(&options).await; - let plan = FilteredReadExec::try_new(self.dataset.clone(), options, index_input, None) - .unwrap(); + let plan = + FilteredReadExec::try_new(self.dataset.clone(), options, index_input).unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let schema = stream.schema(); @@ -2286,7 +2306,7 @@ mod tests { ); let options = FilteredReadOptions::basic_full_read(&dataset).with_filter_plan(filter_plan); - let plan = FilteredReadExec::try_new(dataset.clone(), options, None, None).unwrap(); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let batches = stream.try_collect::>().await.unwrap(); let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); @@ -2466,7 +2486,7 @@ mod tests { .with_projection(fixture.dataset.empty_projection()); let index_input = fixture.index_input(&options).await; let Err(Error::InvalidInput { source, .. }) = - FilteredReadExec::try_new(fixture.dataset.clone(), options, index_input, None) + FilteredReadExec::try_new(fixture.dataset.clone(), options, index_input) else { panic!("Expected an InvalidInput error when given an empty projection"); }; @@ -2705,7 +2725,7 @@ mod tests { let base_options = FilteredReadOptions::basic_full_read(&dataset); let options = base_options.with_scan_range_before_filter(3..4).unwrap(); - let plan = FilteredReadExec::try_new(dataset.clone(), options, None, None).unwrap(); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let schema = stream.schema(); let batches = stream.try_collect::>().await.unwrap(); @@ -3501,9 +3521,8 @@ mod tests { async fn test_metrics_with_limit_partial_fragment() { let fixture = TestFixture::new().await; let options = FilteredReadOptions::basic_full_read(&fixture.dataset).with_batch_size(10); - let filtered_read = Arc::new( - FilteredReadExec::try_new(fixture.dataset.clone(), options, None, None).unwrap(), - ); + let filtered_read = + Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); let batches = filtered_read .execute(0, Arc::new(TaskContext::default())) @@ -3565,7 +3584,7 @@ mod tests { // Path 1: Direct execution (no plan provided) let index_input = fixture.index_input(&options).await; let exec1 = - FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input) .unwrap(); let stream1 = exec1.execute(0, ctx.clone()).unwrap(); let schema1 = stream1.schema(); @@ -3575,16 +3594,18 @@ mod tests { // Path 2: Get plan first, then create new exec with plan via with_plan let index_input = fixture.index_input(&options).await; let exec2 = - FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input) .unwrap(); let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); // Create new exec and use with_plan to set the plan let index_input = fixture.index_input(&options).await; let exec3 = - FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input, None) + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input) .unwrap() - .with_plan(plan); + .with_plan(plan) + .await + .unwrap(); let stream3 = exec3.execute(0, ctx.clone()).unwrap(); let schema3 = stream3.schema(); let batches3 = stream3.try_collect::>().await.unwrap(); @@ -3603,21 +3624,23 @@ mod tests { .unwrap(); // Path 1: Direct execution - let exec1 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) - .unwrap(); + let exec1 = + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None).unwrap(); let stream1 = exec1.execute(0, ctx.clone()).unwrap(); let schema1 = stream1.schema(); let batches1 = stream1.try_collect::>().await.unwrap(); let result1 = concat_batches(&schema1, &batches1).unwrap(); // Path 2: Get plan, then create new exec with_plan - let exec2 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) - .unwrap(); + let exec2 = + FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None).unwrap(); let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); - let exec3 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None, None) + let exec3 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None) .unwrap() - .with_plan(plan); + .with_plan(plan) + .await + .unwrap(); let stream3 = exec3.execute(0, ctx.clone()).unwrap(); let schema3 = stream3.schema(); let batches3 = stream3.try_collect::>().await.unwrap(); From 05b9bf6a6bce48b8914fb3d3044bbf74c0a48c0d Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Thu, 29 Jan 2026 18:09:39 -0800 Subject: [PATCH 3/4] fix: remove redundant clone in test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/io/exec/filtered_read.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 2622c4514a1..11cb33615f3 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -499,7 +499,7 @@ impl FilteredReadStream { // If the scan range is not ignoring the filters we can only push it down if: // 1. The index result is an exact match (we know exactly which rows will be in the result) // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) - // Returns: FilteredReadInternalPlan with range-based row selection (no bitmap conversion) + // Returns: FilteredReadInternalPlan #[instrument(name = "plan_scan", skip_all)] fn plan_scan( fragments: &[LoadedFragment], @@ -520,7 +520,6 @@ impl FilteredReadStream { .unwrap_or(u64::MAX); // Full fragment ranges to read before applying scan_range_after_filter - // Uses BTreeMap to maintain deterministic fragment order for scan_range_after_filter let mut fragments_to_read: BTreeMap>> = BTreeMap::new(); // Fragment ranges to read after applying scan_range_after_filter // Adds an extra map because if scan_range_after_filter cannot be fulfilled we need to @@ -3596,7 +3595,7 @@ mod tests { let exec2 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), index_input) .unwrap(); - let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); + let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap(); // Create new exec and use with_plan to set the plan let index_input = fixture.index_input(&options).await; @@ -3634,7 +3633,7 @@ mod tests { // Path 2: Get plan, then create new exec with_plan let exec2 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None).unwrap(); - let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap().clone(); + let plan = exec2.get_or_create_plan(ctx.clone()).await.unwrap(); let exec3 = FilteredReadExec::try_new(fixture.dataset.clone(), options.clone(), None) .unwrap() From 20dacb89e1b93565ac83651b8035bd88b3e0c00b Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Thu, 29 Jan 2026 18:12:55 -0800 Subject: [PATCH 4/4] small fix --- rust/lance/src/io/exec/filtered_read.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 11cb33615f3..0e53d3666ba 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -632,7 +632,6 @@ impl FilteredReadStream { options.scan_range_after_filter.clone() }; - // Return internal plan with ranges (no bitmap conversion) FilteredReadInternalPlan { rows: fragments_to_read, filters, @@ -640,8 +639,6 @@ impl FilteredReadStream { } } - /// Convert internal plan (ranges) to Vec for I/O - /// No bitmap conversion needed - ranges are used directly fn plan_to_scoped_fragments( plan: &FilteredReadInternalPlan, fragments: &[LoadedFragment], @@ -845,7 +842,6 @@ impl FilteredReadStream { } } - /// Convert ranges to a RoaringBitmap // Given a logical position and bounds, calculate the number of rows to skip and take fn calculate_fetch( position: Range, // position of the fragment in dataset/fragment coordinates @@ -1629,7 +1625,7 @@ impl FilteredReadExec { }) } - /// Get or create the internal plan (ranges, no bitmap conversion) + /// Get or create the internal plan async fn get_or_create_plan_impl<'a>( plan_cell: &'a OnceCell, dataset: Arc,