From e5300ae39336eea1c506c807d8d77a4d30d44d1a Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Sun, 28 Sep 2025 18:42:03 -0700 Subject: [PATCH] Create barebones plan_splits method --- rust/lance-index/src/scalar/zonemap.rs | 46 ++++++--- rust/lance/src/dataset/statistics.rs | 129 ++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 14 deletions(-) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index c528b85600d..2abc7ba135c 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -34,7 +34,7 @@ use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array, UInt64Arr use arrow_schema::{DataType, Field}; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::ScalarValue; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::{HashMap, HashSet}, sync::Arc}; use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult}; use crate::scalar::FragReuseIndex; @@ -54,17 +54,17 @@ const ZONEMAP_INDEX_VERSION: u32 = 0; /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] -struct ZoneMapStatistics { - min: ScalarValue, - max: ScalarValue, - null_count: u32, +pub struct ZoneMapStatistics { + pub min: ScalarValue, + pub max: ScalarValue, + pub null_count: u32, // only apply to float type - nan_count: u32, - fragment_id: u64, + pub nan_count: u32, + pub fragment_id: u64, // zone_start is the start row of the zone in the fragment, also known // as local row offset - zone_start: u64, - zone_length: usize, + pub zone_start: u64, + pub zone_length: usize, } impl DeepSizeOf for ZoneMapStatistics { @@ -128,6 +128,28 @@ impl DeepSizeOf for ZoneMapIndex { } impl ZoneMapIndex { + /// Fetches all zones which contain values matching the given query + /// Used in building splits for engines like Spark, Trino, etc. + pub fn fetch_zones_for_query( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + ) -> Result> { + metrics.record_comparisons(self.zones.len()); + let query = query.as_any().downcast_ref::().unwrap(); + let mut filtered_zones = Vec::new(); + + // Loop through zones and check each one + for zone in self.zones.iter() { + // Check if this zone matches the query + if self.evaluate_zone_against_query(zone, query)? { + filtered_zones.push(zone.clone()); + } + } + + Ok(filtered_zones) + } + /// Evaluates whether a zone could potentially contain values matching the query /// For NaN, total order is used here /// reference: https://doc.rust-lang.org/std/primitive.f64.html#method.total_cmp @@ -1052,8 +1074,10 @@ mod tests { // Add missing imports for the tests use crate::metrics::NoOpMetricsCollector; - use crate::Index; // Import Index trait to access calculate_included_frags - use roaring::RoaringBitmap; // Import RoaringBitmap for the test + use crate::Index; + // Import Index trait to access calculate_included_frags + use roaring::RoaringBitmap; + // Import RoaringBitmap for the test use std::collections::Bound; // Adds a _rowaddr column emulating each batch as a new fragment diff --git a/rust/lance/src/dataset/statistics.rs b/rust/lance/src/dataset/statistics.rs index e2dfa34e353..2a39c3eb1cd 100644 --- a/rust/lance/src/dataset/statistics.rs +++ b/rust/lance/src/dataset/statistics.rs @@ -3,12 +3,17 @@ //! Module for statistics related to the dataset. -use std::{collections::HashMap, future::Future, sync::Arc}; +use std::{collections::{HashMap, HashSet}, future::Future, sync::Arc}; +use super::{fragment::FileFragment, Dataset}; use lance_core::Result; +use lance_index::scalar::AnyQuery; +use lance_index::{metrics::NoOpMetricsCollector, scalar::zonemap::{ZoneMapIndex, ZoneMapStatistics}}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; - -use super::{fragment::FileFragment, Dataset}; +use lance_table::{ + format::RowIdMeta, + io::deletion::deletion_file_path, +}; /// Statistics about a single field in the dataset pub struct FieldStatistics { @@ -26,11 +31,39 @@ pub struct DataStatistics { pub fields: Vec, } +/// A `Split` refers to a subset of a dataset which has been filtered by a query +/// `Split` contains a list of zonemaps which contain fragment that have rows included in the query. +/// It also contains a mapping of fragment IDs to their associated files. +#[derive(Debug, Clone)] +pub struct Split { + /// Zone statistics for this split + pub zone_stats: Vec, + /// Map of fragment ID to all files associated with that fragment + pub files: HashMap, +} + +#[derive(Debug, Default, Clone)] +pub struct FragmentFiles { + pub data_files: Vec, + pub deletion_files: Vec, + pub row_id_files: Vec, +} + pub trait DatasetStatisticsExt { /// Get statistics about the data in the dataset fn calculate_data_stats( self: &Arc, ) -> impl Future> + Send; + + /// Get splits partitioned by target size with associated file mappings + fn get_splits( + self: &Arc, + query: &dyn AnyQuery, + target_size: usize + ) -> Result>; + + /// Build a mapping of fragment ID to FragmentFiles for the given zones + fn build_fragment_files_mapping(&self, zones: &Vec) -> Result>; } impl DatasetStatisticsExt for Dataset { @@ -66,4 +99,94 @@ impl DatasetStatisticsExt for Dataset { fields: field_stats, }) } + + fn get_splits( + self: &Arc, + query: &dyn AnyQuery, + target_size: usize + ) -> Result> { + let index: ZoneMapIndex; // TODO: find out how to get access to the zonemap index for all? columns + let mut filtered_zones = index.fetch_zones_for_query(query, &NoOpMetricsCollector)?; + // Sort by fragment ID to co-locate overlapping zones + filtered_zones.sort_by(|z1, z2| { + if z1.fragment_id != z2.fragment_id { + return z1.fragment_id.cmp(&z2.fragment_id) + } + return z1.zone_start.cmp(&z2.zone_start); + }); + + let mut splits = Vec::new(); + let mut current_split_zones = Vec::new(); + let mut current_size = 0; + + for zone in filtered_zones { + let zone_size = zone.zone_length; + + // TODO: need to confirm if this is right + if current_size > 0 && current_size + zone_size > target_size { + let files = self.build_fragment_files_mapping(¤t_split_zones)?; + splits.push(Split { + zone_stats: current_split_zones, + files, + }); + + current_split_zones = vec![zone]; + current_size = zone_size; + } else { + current_split_zones.push(zone); + current_size += zone_size; + } + } + + if !current_split_zones.is_empty() { + let files = self.build_fragment_files_mapping(¤t_split_zones)?; + splits.push(Split { + zone_stats: current_split_zones, + files, + }); + } + + Ok(splits) + } + + /// Build a mapping of fragment ID to FragmentFiles for the given zones + /// TODO: maybe move to fragment.rs or similar + fn build_fragment_files_mapping(&self, zones: &Vec) -> Result> { + let fragment_ids: HashSet = zones + .iter() + .map(|zone| zone.fragment_id) + .collect(); + + let mut files_mapping = HashMap::new(); + for fragment_id in fragment_ids { + if let Some(fragment) = self.get_fragment(fragment_id as usize) { + let frag_metadata = fragment.metadata(); + let mut fragment_files = FragmentFiles::default(); + + for data_file in &frag_metadata.files { + fragment_files.data_files.push(format!("data/{}", data_file.path)); + } + + if let Some(deletion_file) = &frag_metadata.deletion_file { + let deletion_path = deletion_file_path( + &self.base, + fragment_id, + deletion_file + ); + fragment_files.deletion_files.push(deletion_path.to_string()); + } + + if let Some(row_id_meta) = &frag_metadata.row_id_meta { + if let RowIdMeta::External(external_file) = row_id_meta { + fragment_files.row_id_files.push(external_file.path.clone()); + } + // Inline row IDs don't have separate files, so we skip them + } + + files_mapping.insert(fragment_id, fragment_files); + } + } + + Ok(files_mapping) + } }