Problem
#3337
This calculation is blocking spark job from submitting way too long. E.g., the statistics calculation take over 80 minutes (in spark driver :() while the parallel data processing takes 60 minutes.
Key code paths
- The JNI method calls a full dataset scan of v2 file metadata: lance/src/dataset/statistics.rs:36:69
impl DatasetStatisticsExt for Dataset {
async fn calculate_data_stats(self: &Arc<Self>) -> Result<DataStatistics> {
let field_ids = self.schema().field_ids();
let mut field_stats: HashMap<u32, FieldStatistics> = /* init zeros */;
if !self.is_legacy_storage() {
let scan_scheduler = ScanScheduler::new(
self.object_store.clone(),
SchedulerConfig::max_bandwidth(self.object_store.as_ref()),
);
for fragment in self.fragments().as_ref() {
let file_fragment = FileFragment::new(self.clone(), fragment.clone());
file_fragment
.update_storage_stats(&mut field_stats, self.schema(), scan_scheduler.clone())
.await?;
}
}
Ok(DataStatistics { fields: field_stats })
}
}
- Each fragment opens v2 file readers to collect column-level sizes: lance/src/dataset/fragment.rs:768:783
pub(crate) async fn update_storage_stats(
&self,
field_stats: &mut HashMap<u32, FieldStatistics>,
dataset_schema: &Schema,
scan_scheduler: Arc<ScanScheduler>,
) -> Result<()> {
for reader in self
.open_readers(dataset_schema, &FragReadConfig::default().with_scan_scheduler(scan_scheduler))
.await?
{
reader.update_storage_stats(field_stats);
}
Ok(())
}
- v2 reader summarizes per-column sizes by inspecting the column metadata (which requires opening the file, reading tail and column-metadata sections): lance-file/src/v2/reader.rs:369:390
pub fn file_statistics(&self) -> FileStatistics {
let column_metadatas = &self.metadata().column_metadatas;
let column_stats = column_metadatas
.iter()
.map(|col_metadata| {
let num_pages = col_metadata.pages.len();
let size_bytes = col_metadata
.pages
.iter()
.map(|page| page.buffer_sizes.iter().sum::<u64>())
.sum::<u64>();
ColumnStatistics { num_pages, size_bytes }
})
.collect();
FileStatistics { columns: column_stats }
}
- And the per-file reader pushes those bytes into field aggregates (field↔column mapping): lance/src/dataset/fragment.rs:441:462
fn update_storage_stats(&self, field_stats: &mut HashMap<u32, FieldStatistics>) {
let file_statistics = self.reader.file_statistics();
let column_idx_to_field_id = /* invert map */;
let mut current_field_id = 0;
for (column_idx, stats) in file_statistics.columns.iter().enumerate() {
if let Some(field_id) = column_idx_to_field_id.get(&(column_idx as u32)) {
current_field_id = *field_id;
}
if let Some(field_stats) = field_stats.get_mut(¤t_field_id) {
field_stats.bytes_on_disk += stats.size_bytes;
}
}
}
Some thoughts
- Spark calls lance connector’s statistics during plan estimation. That hits Dataset.calculateDataSize() → nativeGetDataStatistics() → calculate_data_stats() above.
- For large datasets on remote object stores, this opens thousands of files IN SEQUENCE and reads many tail/metadata regions. Even though each file only reads “metadata,” total bytes can accumulate to gigabytes across many files/columns/pages. Driver does this with no executors, hence a single small stage and long time at ~1MB/s. (In my case it steadily fetched over 50K fragment metadata, and tooks up to 5GiB memory in spark driver, costing almost 2 hours)
- Caches help only after the first pass; the initial pass must fetch everything.
Proposals
- parallize this method, especially during the loop over fragmetns
- maybe another method for rough estimation
- provide a toggle in the lakehouse layer to turn off this calculation, which I may create another issue in lance-spark
Problem
#3337
This calculation is blocking spark job from submitting way too long. E.g., the statistics calculation take over 80 minutes (in spark driver :() while the parallel data processing takes 60 minutes.
Key code paths
Some thoughts
Proposals