From 8921a95af101d84bd16b0ce6247dfc115922f71a Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 20 Apr 2026 13:20:51 -0400 Subject: [PATCH 01/10] Add metrics for bytes read, need to clean up clippy. --- .../iceberg/src/arrow/delete_file_loader.rs | 1 + crates/iceberg/src/arrow/mod.rs | 2 + crates/iceberg/src/arrow/reader.rs | 90 +++++++++++++++++-- crates/iceberg/src/arrow/scan_metrics.rs | 73 +++++++++++++++ crates/iceberg/src/scan/mod.rs | 10 ++- 5 files changed, 168 insertions(+), 8 deletions(-) create mode 100644 crates/iceberg/src/arrow/scan_metrics.rs diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 0be62ad496..ee8e07831a 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -69,6 +69,7 @@ impl BasicDeleteFileLoader { &self.file_io, file_size_in_bytes, parquet_read_options, + None, ) .await?; diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 7823320452..3c8ed304c1 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -32,9 +32,11 @@ mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; +mod scan_metrics; mod value; pub use reader::*; +pub use scan_metrics::ScanMetrics; pub use value::*; /// Partition value calculator for computing partition values pub mod partition_value_calculator; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 488f41cf20..41d5a5a454 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -21,6 +21,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; @@ -48,6 +49,7 @@ use typed_builder::TypedBuilder; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics}; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; @@ -244,8 +246,9 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. - /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, tasks: FileScanTaskStream) -> Result { + /// Returns a stream of Arrow RecordBatches containing the data from the files, + /// along with [`ScanMetrics`] tracking I/O during the scan. + pub fn read(self, tasks: FileScanTaskStream) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { let file_io = self.file_io.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; @@ -253,12 +256,15 @@ impl ArrowReader { let row_selection_enabled = self.row_selection_enabled; let parquet_read_options = self.parquet_read_options; + let (scan_metrics, bytes_read) = ScanMetrics::new(); + // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { Box::pin( tasks .and_then(move |task| { let file_io = file_io.clone(); + let bytes_read = Arc::clone(&bytes_read); Self::process_file_scan_task( task, @@ -268,6 +274,7 @@ impl ArrowReader { row_group_filtering_enabled, row_selection_enabled, parquet_read_options, + bytes_read, ) }) .map_err(|err| { @@ -281,6 +288,7 @@ impl ArrowReader { tasks .map_ok(move |task| { let file_io = file_io.clone(); + let bytes_read = Arc::clone(&bytes_read); Self::process_file_scan_task( task, @@ -290,6 +298,7 @@ impl ArrowReader { row_group_filtering_enabled, row_selection_enabled, parquet_read_options, + bytes_read, ) }) .map_err(|err| { @@ -301,7 +310,7 @@ impl ArrowReader { ) }; - Ok(stream) + Ok((stream, scan_metrics)) } async fn process_file_scan_task( @@ -312,6 +321,7 @@ impl ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, parquet_read_options: ParquetReadOptions, + bytes_read: Arc, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -327,6 +337,7 @@ impl ArrowReader { &file_io, task.file_size_in_bytes, parquet_read_options, + Some(&bytes_read), ) .await?; @@ -611,9 +622,16 @@ impl ArrowReader { file_io: &FileIO, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, + bytes_read: Option<&Arc>, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; - let parquet_reader = parquet_file.reader().await?; + let parquet_reader: Box = match bytes_read { + Some(counter) => Box::new(CountingFileRead::new( + parquet_file.reader().await?, + Arc::clone(counter), + )), + None => parquet_file.reader().await?, + }; let mut reader = ArrowFileReader::new( FileMetadata { size: file_size_in_bytes, @@ -2236,6 +2254,51 @@ message schema { assert_eq!(result_data, expected); } + #[tokio::test] + async fn test_scan_metrics_bytes_read() { + let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())]; + + let (file_io, schema, table_location, _temp_dir) = + setup_kleene_logic(data_for_col_a, DataType::Utf8); + + let file_path = format!("{table_location}/1.parquet"); + let file_size = std::fs::metadata(&file_path).unwrap().len(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let task = FileScanTask { + file_size_in_bytes: file_size, + start: 0, + length: 0, + record_count: None, + data_file_path: file_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let (stream, scan_metrics) = reader.read(tasks).unwrap(); + + // Metrics should be zero before consuming the stream + assert_eq!(scan_metrics.bytes_read(), 0); + + let _batches: Vec = stream.try_collect().await.unwrap(); + + // After consuming the stream, bytes_read should reflect actual I/O + assert!( + scan_metrics.bytes_read() > 0, + "Expected bytes_read > 0, got {}", + scan_metrics.bytes_read() + ); + } + #[tokio::test] async fn test_predicate_cast_literal() { let predicates = vec![ @@ -2340,6 +2403,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -2681,6 +2745,7 @@ message schema { .clone() .read(tasks1) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -2697,6 +2762,7 @@ message schema { let result2 = reader .read(tasks2) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -2812,6 +2878,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -2985,6 +3052,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3205,6 +3273,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3418,6 +3487,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3526,6 +3596,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3627,6 +3698,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3717,6 +3789,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3821,6 +3894,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -3954,6 +4028,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4054,6 +4129,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4168,6 +4244,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4298,6 +4375,7 @@ message schema { let result = reader .read(tasks_stream) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4479,6 +4557,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4895,6 +4974,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .0 .try_collect::>() .await .unwrap(); @@ -4959,7 +5039,7 @@ message schema { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - reader.read(tasks).unwrap().try_collect().await.unwrap() + reader.read(tasks).unwrap().0.try_collect().await.unwrap() } // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs new file mode 100644 index 0000000000..7547cf32a2 --- /dev/null +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scan metrics and I/O counting for Parquet data file reads. + +use std::ops::Range; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use bytes::Bytes; + +use crate::error::Result; +use crate::io::FileRead; + +/// Wraps a [`FileRead`] to count bytes read via a shared atomic counter. +pub(crate) struct CountingFileRead { + inner: Box, + bytes_read: Arc, +} + +impl CountingFileRead { + pub(crate) fn new(inner: Box, bytes_read: Arc) -> Self { + Self { inner, bytes_read } + } +} + +#[async_trait::async_trait] +impl FileRead for CountingFileRead { + async fn read(&self, range: Range) -> Result { + self.bytes_read + .fetch_add(range.end - range.start, Ordering::Relaxed); + self.inner.read(range).await + } +} + +/// Metrics collected during an Iceberg scan. +/// +/// Returned alongside the record batch stream from [`ArrowReader::read`](super::ArrowReader::read). +/// Additional counters (e.g. positional deletes applied, row groups pruned) +/// can be added here without changing the `read()` return type. +#[derive(Clone, Debug)] +pub struct ScanMetrics { + bytes_read: Arc, +} + +impl ScanMetrics { + pub(crate) fn new() -> (Self, Arc) { + let bytes_read = Arc::new(AtomicU64::new(0)); + let metrics = Self { + bytes_read: Arc::clone(&bytes_read), + }; + (metrics, bytes_read) + } + + /// Total bytes read from storage for data files during this scan. + pub fn bytes_read(&self) -> u64 { + self.bytes_read.load(Ordering::Relaxed) + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4a1e27bdc1..e3d63a2371 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -32,6 +32,7 @@ use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; +pub use crate::arrow::ScanMetrics; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -441,7 +442,10 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder.build().read(self.plan_files().await?) + let (stream, _scan_metrics) = arrow_reader_builder + .build() + .read(self.plan_files().await?)?; + Ok(stream) } /// Returns a reference to the column names of the table scan. @@ -1361,14 +1365,14 @@ pub mod tests { assert_eq!(plan_task.len(), 2); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); - let batch_stream = reader + let (batch_stream, _) = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); - let batch_stream = reader + let (batch_stream, _) = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); From a4457df56eff0c39219eab02ca39e322f29a879f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 20 Apr 2026 13:31:16 -0400 Subject: [PATCH 02/10] Clean up. --- crates/iceberg/src/arrow/reader.rs | 111 +++++++++++++---------------- 1 file changed, 48 insertions(+), 63 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 41d5a5a454..a7b778ba65 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -244,39 +244,42 @@ pub struct ArrowReader { parquet_read_options: ParquetReadOptions, } +/// Per-scan state for processing [`FileScanTask`]s. Created once per +/// [`ArrowReader::read`] call and cloned per task. +#[derive(Clone)] +struct FileScanTaskReader { + batch_size: Option, + file_io: FileIO, + delete_file_loader: CachingDeleteFileLoader, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, + parquet_read_options: ParquetReadOptions, + bytes_read: Arc, +} + impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files, /// along with [`ScanMetrics`] tracking I/O during the scan. pub fn read(self, tasks: FileScanTaskStream) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { - let file_io = self.file_io.clone(); - let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; - let row_group_filtering_enabled = self.row_group_filtering_enabled; - let row_selection_enabled = self.row_selection_enabled; - let parquet_read_options = self.parquet_read_options; - let (scan_metrics, bytes_read) = ScanMetrics::new(); + let task_reader = FileScanTaskReader { + batch_size: self.batch_size, + file_io: self.file_io, + delete_file_loader: self.delete_file_loader, + row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, + parquet_read_options: self.parquet_read_options, + bytes_read, + }; + // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { Box::pin( tasks - .and_then(move |task| { - let file_io = file_io.clone(); - let bytes_read = Arc::clone(&bytes_read); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - parquet_read_options, - bytes_read, - ) - }) + .and_then(move |task| task_reader.clone().process(task)) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed") .with_source(err) @@ -286,21 +289,7 @@ impl ArrowReader { } else { Box::pin( tasks - .map_ok(move |task| { - let file_io = file_io.clone(); - let bytes_read = Arc::clone(&bytes_read); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - parquet_read_options, - bytes_read, - ) - }) + .map_ok(move |task| task_reader.clone().process(task)) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed") .with_source(err) @@ -312,32 +301,26 @@ impl ArrowReader { Ok((stream, scan_metrics)) } +} - async fn process_file_scan_task( - task: FileScanTask, - batch_size: Option, - file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, - row_group_filtering_enabled: bool, - row_selection_enabled: bool, - parquet_read_options: ParquetReadOptions, - bytes_read: Arc, - ) -> Result { +impl FileScanTaskReader { + async fn process(self, task: FileScanTask) -> Result { let should_load_page_index = - (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - let mut parquet_read_options = parquet_read_options; + (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let mut parquet_read_options = self.parquet_read_options; parquet_read_options.preload_page_index = should_load_page_index; - let delete_filter_rx = - delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); + let delete_filter_rx = self + .delete_file_loader + .load_deletes(&task.deletes, Arc::clone(&task.schema)); // Open the Parquet file once, loading its metadata - let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file( + let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file( &task.data_file_path, - &file_io, + &self.file_io, task.file_size_in_bytes, parquet_read_options, - Some(&bytes_read), + Some(&self.bytes_read), ) .await?; @@ -435,7 +418,7 @@ impl ArrowReader { // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) - let projection_mask = Self::get_arrow_projection_mask( + let projection_mask = ArrowReader::get_arrow_projection_mask( &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), @@ -468,7 +451,7 @@ impl ArrowReader { let mut record_batch_transformer = record_batch_transformer_builder.build(); - if let Some(batch_size) = batch_size { + if let Some(batch_size) = self.batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -509,7 +492,7 @@ impl ArrowReader { // Filter row groups based on byte range from task.start and task.length. // If both start and length are 0, read the entire file (backwards compatibility). if task.start != 0 || task.length != 0 { - let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range( + let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range( record_batch_stream_builder.metadata(), task.start, task.length, @@ -518,12 +501,12 @@ impl ArrowReader { } if let Some(predicate) = final_predicate { - let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( + let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), &predicate, )?; - let row_filter = Self::get_row_filter( + let row_filter = ArrowReader::get_row_filter( &predicate, record_batch_stream_builder.parquet_schema(), &iceberg_field_ids, @@ -531,8 +514,8 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter); - if row_group_filtering_enabled { - let predicate_filtered_row_groups = Self::get_selected_row_group_indices( + if self.row_group_filtering_enabled { + let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices( &predicate, record_batch_stream_builder.metadata(), &field_id_map, @@ -554,8 +537,8 @@ impl ArrowReader { }; } - if row_selection_enabled { - row_selection = Some(Self::get_row_selection_for_filter_predicate( + if self.row_selection_enabled { + row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate( &predicate, record_batch_stream_builder.metadata(), &selected_row_group_indices, @@ -571,7 +554,7 @@ impl ArrowReader { let delete_row_selection = { let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); - Self::build_deletes_row_selection( + ArrowReader::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, &positional_delete_indexes, @@ -613,7 +596,9 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } +} +impl ArrowReader { /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without /// reopening the file. From f7a0507ed3a04ee48fc04bf932f7a00280a3936d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 20 Apr 2026 13:44:33 -0400 Subject: [PATCH 03/10] Make API opt-in. --- crates/iceberg/src/arrow/reader.rs | 36 ++++++++++++------------------ crates/iceberg/src/scan/mod.rs | 9 +++----- 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index a7b778ba65..acf1995f2b 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -259,9 +259,18 @@ struct FileScanTaskReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. - /// Returns a stream of Arrow RecordBatches containing the data from the files, - /// along with [`ScanMetrics`] tracking I/O during the scan. - pub fn read(self, tasks: FileScanTaskStream) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { + /// Returns a stream of Arrow RecordBatches containing the data from the files. + pub fn read(self, tasks: FileScanTaskStream) -> Result { + let (stream, _metrics) = self.read_with_metrics(tasks)?; + Ok(stream) + } + + /// Same as [`read`](Self::read), but also returns [`ScanMetrics`] tracking + /// I/O during the scan. + pub fn read_with_metrics( + self, + tasks: FileScanTaskStream, + ) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { let concurrency_limit_data_files = self.concurrency_limit_data_files; let (scan_metrics, bytes_read) = ScanMetrics::new(); @@ -2269,7 +2278,7 @@ message schema { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - let (stream, scan_metrics) = reader.read(tasks).unwrap(); + let (stream, scan_metrics) = reader.read_with_metrics(tasks).unwrap(); // Metrics should be zero before consuming the stream assert_eq!(scan_metrics.bytes_read(), 0); @@ -2388,7 +2397,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -2730,7 +2738,6 @@ message schema { .clone() .read(tasks1) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -2747,7 +2754,6 @@ message schema { let result2 = reader .read(tasks2) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -2863,7 +2869,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3037,7 +3042,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3258,7 +3262,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3472,7 +3475,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3581,7 +3583,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3683,7 +3684,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3774,7 +3774,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -3879,7 +3878,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4013,7 +4011,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4114,7 +4111,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4229,7 +4225,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4360,7 +4355,6 @@ message schema { let result = reader .read(tasks_stream) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4542,7 +4536,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -4959,7 +4952,6 @@ message schema { let result = reader .read(tasks) .unwrap() - .0 .try_collect::>() .await .unwrap(); @@ -5024,7 +5016,7 @@ message schema { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - reader.read(tasks).unwrap().0.try_collect().await.unwrap() + reader.read(tasks).unwrap().try_collect().await.unwrap() } // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e3d63a2371..4452fdf1ae 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -442,10 +442,7 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - let (stream, _scan_metrics) = arrow_reader_builder - .build() - .read(self.plan_files().await?)?; - Ok(stream) + arrow_reader_builder.build().read(self.plan_files().await?) } /// Returns a reference to the column names of the table scan. @@ -1365,14 +1362,14 @@ pub mod tests { assert_eq!(plan_task.len(), 2); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); - let (batch_stream, _) = reader + let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); - let (batch_stream, _) = reader + let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); From f707ba8c6348f0cf4c137f83d770e70683897c63 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 20 Apr 2026 15:21:30 -0400 Subject: [PATCH 04/10] Refactor open logic as well. --- .../iceberg/src/arrow/delete_file_loader.rs | 1 - crates/iceberg/src/arrow/reader.rs | 51 ++++++++++++------- crates/iceberg/src/arrow/scan_metrics.rs | 14 ++--- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index ee8e07831a..0be62ad496 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -69,7 +69,6 @@ impl BasicDeleteFileLoader { &self.file_io, file_size_in_bytes, parquet_read_options, - None, ) .await?; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index acf1995f2b..ff126f4149 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -254,7 +254,7 @@ struct FileScanTaskReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, parquet_read_options: ParquetReadOptions, - bytes_read: Arc, + scan_metrics: ScanMetrics, } impl ArrowReader { @@ -272,7 +272,7 @@ impl ArrowReader { tasks: FileScanTaskStream, ) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { let concurrency_limit_data_files = self.concurrency_limit_data_files; - let (scan_metrics, bytes_read) = ScanMetrics::new(); + let scan_metrics = ScanMetrics::new(); let task_reader = FileScanTaskReader { batch_size: self.batch_size, @@ -281,7 +281,7 @@ impl ArrowReader { row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, parquet_read_options: self.parquet_read_options, - bytes_read, + scan_metrics: scan_metrics.clone(), }; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered @@ -323,13 +323,14 @@ impl FileScanTaskReader { .delete_file_loader .load_deletes(&task.deletes, Arc::clone(&task.schema)); - // Open the Parquet file once, loading its metadata - let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file( + // Open the Parquet file once, loading its metadata. + // Uses the counting variant so metadata + data page I/O is tracked. + let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file_counted( &task.data_file_path, &self.file_io, task.file_size_in_bytes, parquet_read_options, - Some(&self.bytes_read), + self.scan_metrics.bytes_read_counter(), ) .await?; @@ -608,24 +609,40 @@ impl FileScanTaskReader { } impl ArrowReader { - /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. - /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without - /// reopening the file. + /// Opens a Parquet file and loads its metadata. pub(crate) async fn open_parquet_file( data_file_path: &str, file_io: &FileIO, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, - bytes_read: Option<&Arc>, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; - let parquet_reader: Box = match bytes_read { - Some(counter) => Box::new(CountingFileRead::new( - parquet_file.reader().await?, - Arc::clone(counter), - )), - None => parquet_file.reader().await?, - }; + let parquet_reader = parquet_file.reader().await?; + Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await + } + + /// Opens a Parquet file wrapped with [`CountingFileRead`] so that all I/O + /// (metadata + data pages) is accumulated into `bytes_read`. + async fn open_parquet_file_counted( + data_file_path: &str, + file_io: &FileIO, + file_size_in_bytes: u64, + parquet_read_options: ParquetReadOptions, + bytes_read: &Arc, + ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { + let parquet_file = file_io.new_input(data_file_path)?; + let parquet_reader: Box = Box::new(CountingFileRead::new( + parquet_file.reader().await?, + Arc::clone(bytes_read), + )); + Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await + } + + async fn build_parquet_reader( + parquet_reader: Box, + file_size_in_bytes: u64, + parquet_read_options: ParquetReadOptions, + ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let mut reader = ArrowFileReader::new( FileMetadata { size: file_size_in_bytes, diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs index 7547cf32a2..bf23c941af 100644 --- a/crates/iceberg/src/arrow/scan_metrics.rs +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -58,12 +58,14 @@ pub struct ScanMetrics { } impl ScanMetrics { - pub(crate) fn new() -> (Self, Arc) { - let bytes_read = Arc::new(AtomicU64::new(0)); - let metrics = Self { - bytes_read: Arc::clone(&bytes_read), - }; - (metrics, bytes_read) + pub(crate) fn new() -> Self { + Self { + bytes_read: Arc::new(AtomicU64::new(0)), + } + } + + pub(crate) fn bytes_read_counter(&self) -> &Arc { + &self.bytes_read } /// Total bytes read from storage for data files during this scan. From ab4ebee6f56dcd886790c28de13ab259e9472cea Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 20 Apr 2026 15:23:49 -0400 Subject: [PATCH 05/10] Put missing comment back. --- crates/iceberg/src/arrow/reader.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ff126f4149..01f338b303 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -609,7 +609,9 @@ impl FileScanTaskReader { } impl ArrowReader { - /// Opens a Parquet file and loads its metadata. + /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. + /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without + /// reopening the file. pub(crate) async fn open_parquet_file( data_file_path: &str, file_io: &FileIO, From fa055faa13f718a4f82b1dec8128df249e336548 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Apr 2026 08:52:11 -0400 Subject: [PATCH 06/10] Address PR feedback. --- crates/iceberg/src/arrow/mod.rs | 2 +- crates/iceberg/src/arrow/reader.rs | 87 ++++++++++++++---------- crates/iceberg/src/arrow/scan_metrics.rs | 44 +++++++++--- crates/iceberg/src/io/file_io.rs | 7 ++ crates/iceberg/src/scan/mod.rs | 13 ++-- 5 files changed, 103 insertions(+), 50 deletions(-) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 3c8ed304c1..bf53633cfc 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -36,7 +36,7 @@ mod scan_metrics; mod value; pub use reader::*; -pub use scan_metrics::ScanMetrics; +pub use scan_metrics::{ScanMetrics, ScanResult}; pub use value::*; /// Partition value calculator for computing partition values pub mod partition_value_calculator; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 01f338b303..758b0ea81f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -49,7 +49,7 @@ use typed_builder::TypedBuilder; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; -use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics}; +use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; @@ -244,33 +244,10 @@ pub struct ArrowReader { parquet_read_options: ParquetReadOptions, } -/// Per-scan state for processing [`FileScanTask`]s. Created once per -/// [`ArrowReader::read`] call and cloned per task. -#[derive(Clone)] -struct FileScanTaskReader { - batch_size: Option, - file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, - row_group_filtering_enabled: bool, - row_selection_enabled: bool, - parquet_read_options: ParquetReadOptions, - scan_metrics: ScanMetrics, -} - impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. - /// Returns a stream of Arrow RecordBatches containing the data from the files. - pub fn read(self, tasks: FileScanTaskStream) -> Result { - let (stream, _metrics) = self.read_with_metrics(tasks)?; - Ok(stream) - } - - /// Same as [`read`](Self::read), but also returns [`ScanMetrics`] tracking - /// I/O during the scan. - pub fn read_with_metrics( - self, - tasks: FileScanTaskStream, - ) -> Result<(ArrowRecordBatchStream, ScanMetrics)> { + /// Returns a [`ScanResult`] containing the record batch stream and scan metrics. + pub fn read(self, tasks: FileScanTaskStream) -> Result { let concurrency_limit_data_files = self.concurrency_limit_data_files; let scan_metrics = ScanMetrics::new(); @@ -308,10 +285,23 @@ impl ArrowReader { ) }; - Ok((stream, scan_metrics)) + Ok(ScanResult::new(stream, scan_metrics)) } } +/// Per-scan state for processing [`FileScanTask`]s. Created once per +/// [`ArrowReader::read`] call and cloned per task. +#[derive(Clone)] +struct FileScanTaskReader { + batch_size: Option, + file_io: FileIO, + delete_file_loader: CachingDeleteFileLoader, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, + parquet_read_options: ParquetReadOptions, + scan_metrics: ScanMetrics, +} + impl FileScanTaskReader { async fn process(self, task: FileScanTask) -> Result { let should_load_page_index = @@ -323,8 +313,6 @@ impl FileScanTaskReader { .delete_file_loader .load_deletes(&task.deletes, Arc::clone(&task.schema)); - // Open the Parquet file once, loading its metadata. - // Uses the counting variant so metadata + data page I/O is tracked. let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file_counted( &task.data_file_path, &self.file_io, @@ -633,11 +621,14 @@ impl ArrowReader { bytes_read: &Arc, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; - let parquet_reader: Box = Box::new(CountingFileRead::new( - parquet_file.reader().await?, - Arc::clone(bytes_read), - )); - Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await + let counting_reader = + CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read)); + Self::build_parquet_reader( + Box::new(counting_reader), + file_size_in_bytes, + parquet_read_options, + ) + .await } async fn build_parquet_reader( @@ -2297,7 +2288,8 @@ message schema { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - let (stream, scan_metrics) = reader.read_with_metrics(tasks).unwrap(); + let scan_result = reader.read(tasks).unwrap(); + let (stream, scan_metrics) = scan_result.into_parts(); // Metrics should be zero before consuming the stream assert_eq!(scan_metrics.bytes_read(), 0); @@ -2416,6 +2408,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -2757,6 +2750,7 @@ message schema { .clone() .read(tasks1) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -2773,6 +2767,7 @@ message schema { let result2 = reader .read(tasks2) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -2888,6 +2883,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3061,6 +3057,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3281,6 +3278,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3494,6 +3492,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3602,6 +3601,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3703,6 +3703,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3793,6 +3794,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -3897,6 +3899,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4030,6 +4033,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4130,6 +4134,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4244,6 +4249,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4374,6 +4380,7 @@ message schema { let result = reader .read(tasks_stream) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4555,6 +4562,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -4971,6 +4979,7 @@ message schema { let result = reader .read(tasks) .unwrap() + .stream() .try_collect::>() .await .unwrap(); @@ -5035,7 +5044,13 @@ message schema { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - reader.read(tasks).unwrap().try_collect().await.unwrap() + reader + .read(tasks) + .unwrap() + .stream() + .try_collect() + .await + .unwrap() } // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs index bf23c941af..435a818b8b 100644 --- a/crates/iceberg/src/arrow/scan_metrics.rs +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -25,22 +25,24 @@ use bytes::Bytes; use crate::error::Result; use crate::io::FileRead; +use crate::scan::ArrowRecordBatchStream; /// Wraps a [`FileRead`] to count bytes read via a shared atomic counter. -pub(crate) struct CountingFileRead { - inner: Box, +pub(crate) struct CountingFileRead { + inner: F, bytes_read: Arc, } -impl CountingFileRead { - pub(crate) fn new(inner: Box, bytes_read: Arc) -> Self { +impl CountingFileRead { + pub(crate) fn new(inner: F, bytes_read: Arc) -> Self { Self { inner, bytes_read } } } #[async_trait::async_trait] -impl FileRead for CountingFileRead { +impl FileRead for CountingFileRead { async fn read(&self, range: Range) -> Result { + debug_assert!(range.end >= range.start); self.bytes_read .fetch_add(range.end - range.start, Ordering::Relaxed); self.inner.read(range).await @@ -48,10 +50,6 @@ impl FileRead for CountingFileRead { } /// Metrics collected during an Iceberg scan. -/// -/// Returned alongside the record batch stream from [`ArrowReader::read`](super::ArrowReader::read). -/// Additional counters (e.g. positional deletes applied, row groups pruned) -/// can be added here without changing the `read()` return type. #[derive(Clone, Debug)] pub struct ScanMetrics { bytes_read: Arc, @@ -73,3 +71,31 @@ impl ScanMetrics { self.bytes_read.load(Ordering::Relaxed) } } + +/// Result of [`ArrowReader::read`](super::ArrowReader::read), containing the +/// record batch stream and metrics collected during the scan. +pub struct ScanResult { + stream: ArrowRecordBatchStream, + metrics: ScanMetrics, +} + +impl ScanResult { + pub(crate) fn new(stream: ArrowRecordBatchStream, metrics: ScanMetrics) -> Self { + Self { stream, metrics } + } + + /// Consumes the result, returning only the record batch stream. + pub fn stream(self) -> ArrowRecordBatchStream { + self.stream + } + + /// Returns a reference to the scan metrics. + pub fn metrics(&self) -> &ScanMetrics { + &self.metrics + } + + /// Consumes the result into its parts. + pub fn into_parts(self) -> (ArrowRecordBatchStream, ScanMetrics) { + (self.stream, self.metrics) + } +} diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 594b070e03..ff7523d44d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -255,6 +255,13 @@ pub trait FileRead: Send + Sync + Unpin + 'static { async fn read(&self, range: Range) -> crate::Result; } +#[async_trait::async_trait] +impl FileRead for Box { + async fn read(&self, range: Range) -> crate::Result { + self.as_ref().read(range).await + } +} + /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4452fdf1ae..27f479183a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -32,7 +32,7 @@ use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; -pub use crate::arrow::ScanMetrics; +pub use crate::arrow::{ScanMetrics, ScanResult}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -442,7 +442,10 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder.build().read(self.plan_files().await?) + arrow_reader_builder + .build() + .read(self.plan_files().await?) + .map(|result| result.stream()) } /// Returns a reference to the column names of the table scan. @@ -1365,13 +1368,15 @@ pub mod tests { let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .unwrap(); + .unwrap() + .stream(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .unwrap(); + .unwrap() + .stream(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); assert_eq!(batch_1, batch_2); From cf596355fd3fd50839df8b923c48557ab41c04d6 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Apr 2026 09:22:50 -0400 Subject: [PATCH 07/10] Handle delete files and add a test. --- .../src/arrow/caching_delete_file_loader.rs | 52 +++++++- .../iceberg/src/arrow/delete_file_loader.rs | 9 +- crates/iceberg/src/arrow/delete_filter.rs | 14 +- crates/iceberg/src/arrow/reader.rs | 120 +++++++++++++++--- 4 files changed, 165 insertions(+), 30 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..9daeb8fe7c 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -18,6 +18,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Not; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; @@ -152,6 +153,7 @@ impl CachingDeleteFileLoader { &self, delete_file_entries: &[FileScanTaskDeleteFile], schema: SchemaRef, + bytes_read: &Arc, ) -> Receiver> { let (tx, rx) = channel(); @@ -171,6 +173,7 @@ impl CachingDeleteFileLoader { let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); + let bytes_read = Arc::clone(bytes_read); crate::runtime::spawn(async move { let result = async move { let mut del_filter = del_filter; @@ -179,12 +182,14 @@ impl CachingDeleteFileLoader { let mut results_stream = task_stream .map(move |(task, file_io, del_filter, schema)| { let basic_delete_file_loader = basic_delete_file_loader.clone(); + let bytes_read = Arc::clone(&bytes_read); async move { Self::load_file_for_task( &task, basic_delete_file_loader.clone(), del_filter, schema, + &bytes_read, ) .await } @@ -220,6 +225,7 @@ impl CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, del_filter: DeleteFilter, schema: SchemaRef, + bytes_read: &Arc, ) -> Result { match task.file_type { DataContentType::PositionDeletes => { @@ -235,7 +241,11 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + bytes_read, + ) .await?, }), } @@ -254,7 +264,11 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + bytes_read, + ) .await?, schema, &equality_ids_vec, @@ -613,10 +627,12 @@ mod tests { let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let record_batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &eq_delete_file_path, std::fs::metadata(&eq_delete_file_path).unwrap().len(), + &bytes_read, ) .await .expect("could not get batch stream"); @@ -727,11 +743,16 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -809,11 +830,13 @@ mod tests { let file_io = FileIO::new_with_fs(); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &delete_file_path, std::fs::metadata(&delete_file_path).unwrap().len(), + &bytes_read, ) .await .unwrap(); @@ -947,8 +970,13 @@ mod tests { // Load the deletes - should handle both types without error let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let delete_filter = delete_file_loader - .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) + .load_deletes( + &file_scan_task.deletes, + file_scan_task.schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -995,8 +1023,9 @@ mod tests { writer.close().unwrap(); let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), &bytes_read) .await .expect("could not get batch stream"); @@ -1018,19 +1047,28 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); // Load deletes for the first time let delete_filter_1 = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); // Load deletes for the second time (same task/files) let delete_filter_2 = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 0be62ad496..29961df67d 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use futures::{StreamExt, TryStreamExt}; use parquet::arrow::ParquetRecordBatchStreamBuilder; @@ -39,6 +40,7 @@ pub trait DeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, + bytes_read: &Arc, ) -> Result; } @@ -57,6 +59,7 @@ impl BasicDeleteFileLoader { &self, data_file_path: &str, file_size_in_bytes: u64, + bytes_read: &Arc, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -69,6 +72,7 @@ impl BasicDeleteFileLoader { &self.file_io, file_size_in_bytes, parquet_read_options, + bytes_read, ) .await?; @@ -108,9 +112,10 @@ impl DeleteFileLoader for BasicDeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, + bytes_read: &Arc, ) -> Result { let raw_batch_stream = self - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes, bytes_read) .await?; // For equality deletes, only evolve the equality_ids columns. @@ -138,6 +143,7 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); @@ -145,6 +151,7 @@ mod tests { .read_delete_file( &file_scan_tasks[0].deletes[0], file_scan_tasks[0].schema_ref(), + &bytes_read, ) .await .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..fffb9dcdce 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -267,6 +267,7 @@ pub(crate) mod tests { use std::fs::File; use std::path::Path; use std::sync::Arc; + use std::sync::atomic::AtomicU64; use arrow_array::{Int64Array, RecordBatch, StringArray}; use arrow_schema::Schema as ArrowSchema; @@ -293,11 +294,16 @@ pub(crate) mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .load_deletes( + &file_scan_tasks[0].deletes, + file_scan_tasks[0].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); @@ -308,7 +314,11 @@ pub(crate) mod tests { assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2 let delete_filter = delete_file_loader - .load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref()) + .load_deletes( + &file_scan_tasks[1].deletes, + file_scan_tasks[1].schema_ref(), + &bytes_read, + ) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 758b0ea81f..f4905ffdb9 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -309,11 +309,13 @@ impl FileScanTaskReader { let mut parquet_read_options = self.parquet_read_options; parquet_read_options.preload_page_index = should_load_page_index; - let delete_filter_rx = self - .delete_file_loader - .load_deletes(&task.deletes, Arc::clone(&task.schema)); + let delete_filter_rx = self.delete_file_loader.load_deletes( + &task.deletes, + Arc::clone(&task.schema), + self.scan_metrics.bytes_read_counter(), + ); - let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file_counted( + let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file( &task.data_file_path, &self.file_io, task.file_size_in_bytes, @@ -597,27 +599,13 @@ impl FileScanTaskReader { } impl ArrowReader { - /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. - /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without - /// reopening the file. + /// Opens a Parquet file and loads its metadata, wrapping the reader with + /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`. pub(crate) async fn open_parquet_file( data_file_path: &str, file_io: &FileIO, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, - ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { - let parquet_file = file_io.new_input(data_file_path)?; - let parquet_reader = parquet_file.reader().await?; - Self::build_parquet_reader(parquet_reader, file_size_in_bytes, parquet_read_options).await - } - - /// Opens a Parquet file wrapped with [`CountingFileRead`] so that all I/O - /// (metadata + data pages) is accumulated into `bytes_read`. - async fn open_parquet_file_counted( - data_file_path: &str, - file_io: &FileIO, - file_size_in_bytes: u64, - parquet_read_options: ParquetReadOptions, bytes_read: &Arc, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; @@ -2304,6 +2292,98 @@ message schema { ); } + #[tokio::test] + async fn test_scan_metrics_includes_delete_file_bytes() { + let data_for_col_a = vec![ + Some("foo".to_string()), + Some("bar".to_string()), + Some("baz".to_string()), + ]; + + let (file_io, schema, table_location, _temp_dir) = + setup_kleene_logic(data_for_col_a, DataType::Utf8); + + let data_file_path = format!("{table_location}/1.parquet"); + let data_file_size = std::fs::metadata(&data_file_path).unwrap().len(); + + // Write a positional delete file targeting row 0 + let pos_del_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); + let file_path_col = Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])); + let pos_col = Arc::new(arrow_array::Int64Array::from_iter_values(vec![0i64])); + let pos_del_batch = + RecordBatch::try_new(pos_del_schema, vec![file_path_col, pos_col]).unwrap(); + + let pos_del_path = format!("{table_location}/pos-del.parquet"); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(&pos_del_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, pos_del_batch.schema(), Some(props)).unwrap(); + writer.write(&pos_del_batch).unwrap(); + writer.close().unwrap(); + let pos_del_size = std::fs::metadata(&pos_del_path).unwrap().len(); + + // Read without deletes to get a baseline + let reader = ArrowReaderBuilder::new(file_io.clone()).build(); + let task_no_deletes = FileScanTask { + file_size_in_bytes: data_file_size, + start: 0, + length: 0, + record_count: None, + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + let tasks = + Box::pin(futures::stream::iter(vec![Ok(task_no_deletes)])) as FileScanTaskStream; + let (stream, baseline_metrics) = reader.read(tasks).unwrap().into_parts(); + let _: Vec = stream.try_collect().await.unwrap(); + let baseline_bytes = baseline_metrics.bytes_read(); + + // Read with deletes + let reader = ArrowReaderBuilder::new(file_io.clone()).build(); + let task_with_deletes = FileScanTask { + file_size_in_bytes: data_file_size, + start: 0, + length: 0, + record_count: None, + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: pos_del_path, + file_size_in_bytes: pos_del_size, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + let tasks = + Box::pin(futures::stream::iter(vec![Ok(task_with_deletes)])) as FileScanTaskStream; + let (stream, delete_metrics) = reader.read(tasks).unwrap().into_parts(); + let _: Vec = stream.try_collect().await.unwrap(); + + assert!( + delete_metrics.bytes_read() > baseline_bytes, + "Expected bytes_read with deletes ({}) > baseline without deletes ({})", + delete_metrics.bytes_read(), + baseline_bytes + ); + } + #[tokio::test] async fn test_predicate_cast_literal() { let predicates = vec![ From 5c871109286345c37e7d2a3566a506a1dd43a5e2 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Apr 2026 09:35:38 -0400 Subject: [PATCH 08/10] Update crates/iceberg/src/io/file_io.rs Co-authored-by: blackmwk --- crates/iceberg/src/io/file_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index ff7523d44d..4c972a2642 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -256,7 +256,7 @@ pub trait FileRead: Send + Sync + Unpin + 'static { } #[async_trait::async_trait] -impl FileRead for Box { +impl> FileRead for T { async fn read(&self, range: Range) -> crate::Result { self.as_ref().read(range).await } From a3ea53df795e43bbe4a03abace06c897d36f53c8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Apr 2026 09:35:46 -0400 Subject: [PATCH 09/10] Update crates/iceberg/src/arrow/scan_metrics.rs Co-authored-by: blackmwk --- crates/iceberg/src/arrow/scan_metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs index 435a818b8b..907b1af290 100644 --- a/crates/iceberg/src/arrow/scan_metrics.rs +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -95,7 +95,7 @@ impl ScanResult { } /// Consumes the result into its parts. - pub fn into_parts(self) -> (ArrowRecordBatchStream, ScanMetrics) { + pub(crate) fn into_parts(self) -> (ArrowRecordBatchStream, ScanMetrics) { (self.stream, self.metrics) } } From 69ccaeebe1b1073e17073d56513ab187a90a2cc7 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Apr 2026 09:59:22 -0400 Subject: [PATCH 10/10] Address PR feedback. --- .../src/arrow/caching_delete_file_loader.rs | 73 ++++++------------- .../iceberg/src/arrow/delete_file_loader.rs | 27 ++++--- crates/iceberg/src/arrow/delete_filter.rs | 14 +--- crates/iceberg/src/arrow/reader/pipeline.rs | 20 +++-- crates/iceberg/src/arrow/scan_metrics.rs | 5 -- crates/iceberg/src/io/file_io.rs | 2 +- 6 files changed, 56 insertions(+), 85 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 9daeb8fe7c..231971fd54 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet}; use std::ops::Not; use std::sync::Arc; -use std::sync::atomic::AtomicU64; use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; @@ -26,6 +25,7 @@ use tokio::sync::oneshot::{Receiver, channel}; use super::delete_filter::{DeleteFilter, PosDelLoadAction}; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; +use crate::arrow::scan_metrics::ScanMetrics; use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; @@ -78,13 +78,22 @@ enum ParsedDeleteFileContext { #[allow(unused_variables)] impl CachingDeleteFileLoader { pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + let scan_metrics = ScanMetrics::new(); CachingDeleteFileLoader { - basic_delete_file_loader: BasicDeleteFileLoader::new(file_io), + basic_delete_file_loader: BasicDeleteFileLoader::new(file_io, scan_metrics), concurrency_limit_data_files, delete_filter: DeleteFilter::default(), } } + pub(crate) fn with_scan_metrics(mut self, scan_metrics: ScanMetrics) -> Self { + self.basic_delete_file_loader = BasicDeleteFileLoader::new( + self.basic_delete_file_loader.file_io().clone(), + scan_metrics, + ); + self + } + /// Initiates loading of all deletes for all the specified tasks /// /// Returned future completes once all positional deletes and delete vectors @@ -153,7 +162,6 @@ impl CachingDeleteFileLoader { &self, delete_file_entries: &[FileScanTaskDeleteFile], schema: SchemaRef, - bytes_read: &Arc, ) -> Receiver> { let (tx, rx) = channel(); @@ -173,7 +181,6 @@ impl CachingDeleteFileLoader { let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); - let bytes_read = Arc::clone(bytes_read); crate::runtime::spawn(async move { let result = async move { let mut del_filter = del_filter; @@ -182,14 +189,12 @@ impl CachingDeleteFileLoader { let mut results_stream = task_stream .map(move |(task, file_io, del_filter, schema)| { let basic_delete_file_loader = basic_delete_file_loader.clone(); - let bytes_read = Arc::clone(&bytes_read); async move { Self::load_file_for_task( &task, basic_delete_file_loader.clone(), del_filter, schema, - &bytes_read, ) .await } @@ -225,7 +230,6 @@ impl CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, del_filter: DeleteFilter, schema: SchemaRef, - bytes_read: &Arc, ) -> Result { match task.file_type { DataContentType::PositionDeletes => { @@ -241,11 +245,7 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream( - &task.file_path, - task.file_size_in_bytes, - bytes_read, - ) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, }), } @@ -264,11 +264,7 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream( - &task.file_path, - task.file_size_in_bytes, - bytes_read, - ) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?, schema, &equality_ids_vec, @@ -626,13 +622,12 @@ mod tests { let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); - let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); - let bytes_read = Arc::new(AtomicU64::new(0)); + let basic_delete_file_loader = + BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new()); let record_batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &eq_delete_file_path, std::fs::metadata(&eq_delete_file_path).unwrap().len(), - &bytes_read, ) .await .expect("could not get batch stream"); @@ -743,16 +738,11 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); - let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes( - &file_scan_tasks[0].deletes, - file_scan_tasks[0].schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await .unwrap() .unwrap(); @@ -829,14 +819,13 @@ mod tests { }; let file_io = FileIO::new_with_fs(); - let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); - let bytes_read = Arc::new(AtomicU64::new(0)); + let basic_delete_file_loader = + BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new()); let batch_stream = basic_delete_file_loader .parquet_to_batch_stream( &delete_file_path, std::fs::metadata(&delete_file_path).unwrap().len(), - &bytes_read, ) .await .unwrap(); @@ -970,13 +959,8 @@ mod tests { // Load the deletes - should handle both types without error let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); - let bytes_read = Arc::new(AtomicU64::new(0)); let delete_filter = delete_file_loader - .load_deletes( - &file_scan_task.deletes, - file_scan_task.schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) .await .unwrap() .unwrap(); @@ -1022,10 +1006,10 @@ mod tests { writer.write(&record_batch).unwrap(); writer.close().unwrap(); - let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); - let bytes_read = Arc::new(AtomicU64::new(0)); + let basic_delete_file_loader = + BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), &bytes_read) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) .await .expect("could not get batch stream"); @@ -1047,28 +1031,19 @@ mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); - let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); // Load deletes for the first time let delete_filter_1 = delete_file_loader - .load_deletes( - &file_scan_tasks[0].deletes, - file_scan_tasks[0].schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await .unwrap() .unwrap(); // Load deletes for the second time (same task/files) let delete_filter_2 = delete_file_loader - .load_deletes( - &file_scan_tasks[0].deletes, - file_scan_tasks[0].schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 29961df67d..134b029613 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -16,7 +16,6 @@ // under the License. use std::sync::Arc; -use std::sync::atomic::AtomicU64; use futures::{StreamExt, TryStreamExt}; use parquet::arrow::ParquetRecordBatchStreamBuilder; @@ -24,6 +23,7 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder; use crate::arrow::ArrowReader; use crate::arrow::reader::ParquetReadOptions; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::scan_metrics::ScanMetrics; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{Schema, SchemaRef}; @@ -40,26 +40,33 @@ pub trait DeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, - bytes_read: &Arc, ) -> Result; } #[derive(Clone, Debug)] pub(crate) struct BasicDeleteFileLoader { file_io: FileIO, + scan_metrics: ScanMetrics, } #[allow(unused_variables)] impl BasicDeleteFileLoader { - pub fn new(file_io: FileIO) -> Self { - BasicDeleteFileLoader { file_io } + pub fn new(file_io: FileIO, scan_metrics: ScanMetrics) -> Self { + BasicDeleteFileLoader { + file_io, + scan_metrics, + } } + + pub(crate) fn file_io(&self) -> &FileIO { + &self.file_io + } + /// Loads a RecordBatchStream for a given datafile. pub(crate) async fn parquet_to_batch_stream( &self, data_file_path: &str, file_size_in_bytes: u64, - bytes_read: &Arc, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -72,7 +79,7 @@ impl BasicDeleteFileLoader { &self.file_io, file_size_in_bytes, parquet_read_options, - bytes_read, + self.scan_metrics.bytes_read_counter(), ) .await?; @@ -112,10 +119,9 @@ impl DeleteFileLoader for BasicDeleteFileLoader { &self, task: &FileScanTaskDeleteFile, schema: SchemaRef, - bytes_read: &Arc, ) -> Result { let raw_batch_stream = self - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes, bytes_read) + .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) .await?; // For equality deletes, only evolve the equality_ids columns. @@ -142,8 +148,8 @@ mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); - let bytes_read = Arc::new(AtomicU64::new(0)); + let scan_metrics = ScanMetrics::new(); + let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone(), scan_metrics); let file_scan_tasks = setup(table_location); @@ -151,7 +157,6 @@ mod tests { .read_delete_file( &file_scan_tasks[0].deletes[0], file_scan_tasks[0].schema_ref(), - &bytes_read, ) .await .unwrap(); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index fffb9dcdce..6369938ce2 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -267,7 +267,6 @@ pub(crate) mod tests { use std::fs::File; use std::path::Path; use std::sync::Arc; - use std::sync::atomic::AtomicU64; use arrow_array::{Int64Array, RecordBatch, StringArray}; use arrow_schema::Schema as ArrowSchema; @@ -294,16 +293,11 @@ pub(crate) mod tests { let file_io = FileIO::new_with_fs(); let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); - let bytes_read = Arc::new(AtomicU64::new(0)); let file_scan_tasks = setup(table_location); let delete_filter = delete_file_loader - .load_deletes( - &file_scan_tasks[0].deletes, - file_scan_tasks[0].schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await .unwrap() .unwrap(); @@ -314,11 +308,7 @@ pub(crate) mod tests { assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2 let delete_filter = delete_file_loader - .load_deletes( - &file_scan_tasks[1].deletes, - file_scan_tasks[1].schema_ref(), - &bytes_read, - ) + .load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref()) .await .unwrap() .unwrap(); diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 64eefa112d..8ecee294c4 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -52,7 +52,9 @@ impl ArrowReader { let task_reader = FileScanTaskReader { batch_size: self.batch_size, file_io: self.file_io, - delete_file_loader: self.delete_file_loader, + delete_file_loader: self + .delete_file_loader + .with_scan_metrics(scan_metrics.clone()), row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, parquet_read_options: self.parquet_read_options, @@ -107,11 +109,9 @@ impl FileScanTaskReader { let mut parquet_read_options = self.parquet_read_options; parquet_read_options.preload_page_index = should_load_page_index; - let delete_filter_rx = self.delete_file_loader.load_deletes( - &task.deletes, - Arc::clone(&task.schema), - self.scan_metrics.bytes_read_counter(), - ); + let delete_filter_rx = self + .delete_file_loader + .load_deletes(&task.deletes, Arc::clone(&task.schema)); // Open the Parquet file once, loading its metadata let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file( @@ -508,7 +508,13 @@ mod tests { }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; - reader.read(tasks).unwrap().stream().try_collect().await.unwrap() + reader + .read(tasks) + .unwrap() + .stream() + .try_collect() + .await + .unwrap() } // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. diff --git a/crates/iceberg/src/arrow/scan_metrics.rs b/crates/iceberg/src/arrow/scan_metrics.rs index 907b1af290..4331a53fcb 100644 --- a/crates/iceberg/src/arrow/scan_metrics.rs +++ b/crates/iceberg/src/arrow/scan_metrics.rs @@ -93,9 +93,4 @@ impl ScanResult { pub fn metrics(&self) -> &ScanMetrics { &self.metrics } - - /// Consumes the result into its parts. - pub(crate) fn into_parts(self) -> (ArrowRecordBatchStream, ScanMetrics) { - (self.stream, self.metrics) - } } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 4c972a2642..227d8f4d5b 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -256,7 +256,7 @@ pub trait FileRead: Send + Sync + Unpin + 'static { } #[async_trait::async_trait] -impl> FileRead for T { +impl + Send + Sync + Unpin + 'static> FileRead for T { async fn read(&self, range: Range) -> crate::Result { self.as_ref().read(range).await }