feat(reader): Add read_with_metrics() for scan I/O metrics#2349
feat(reader): Add read_with_metrics() for scan I/O metrics#2349blackmwk merged 14 commits intoapache:mainfrom
Conversation
|
@blackmwk I'm trying really hard not to add more to |
CTTY
left a comment
There was a problem hiding this comment.
Just took a quick pass, love the direction!
blackmwk
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich for this pr!
|
This looks like a great move in the right direction - as we discussed before, I've been keen to see metrics captured for a long time. I like that this approach is agnostic with regard to the consumption of the resulting metrics themselves; interested consumers can write their own adapters to |
|
Let's hold a while to wait for #2358. |
|
Cleaning up the merge conflict after #2358. |
Resolve conflict from PR apache#2358 splitting reader.rs into modules. Port bytes_read/ScanMetrics changes into reader/pipeline.rs: - FileScanTaskReader struct with ScanMetrics - CountingFileRead wrapping in open_parquet_file - ScanResult return type from read() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
blackmwk
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich for this pr! Generally LGTM.
Co-authored-by: blackmwk <liurenjie1024@outlook.com>
Co-authored-by: blackmwk <liurenjie1024@outlook.com>
blackmwk
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich for this pr!
- Closes #. Add always-on per-scan I/O metrics to `ArrowReader`. **Motivation:** Downstream engines need per-scan byte counts for their UIs. For example, DataFusion Comet uses this to populate `bytes_scanned` on its Iceberg scan operator, which flows through to Spark UI via `TaskMetrics.inputMetrics.setBytesRead()`. This must be per-scan, not global. Concurrent scans against the same `FileIO` need independent counters. The approach matches DataFusion's pattern of wrapping `AsyncFileReader` with a counting layer and is storage-backend agnostic. **`ArrowReader::read()` now returns `ScanResult`** - `ScanResult` wraps the record batch stream and `ScanMetrics`. Accessors: `stream()`, `metrics()`, `into_parts()`. - Metrics are always collected. One `fetch_add(Relaxed)` per I/O request, negligible overhead. - Counter is created fresh per `read()` call, so cloned readers get independent metrics. **New file: `crates/iceberg/src/arrow/scan_metrics.rs`** - `CountingFileRead<F: FileRead>`: generic wrapper that increments a shared `AtomicU64` on each `read()`. - `ScanMetrics`: public handle exposing `bytes_read()`. - `ScanResult`: public struct returned by `ArrowReader::read()`. **`FileRead` blanket impl for `Box<dyn FileRead>`** - Enables generic `CountingFileRead<F>` to wrap the boxed reader returned by `FileIO::reader()`. **Single `open_parquet_file` with counting** - All Parquet opens (data files and delete files) go through the same `open_parquet_file` wrapped with `CountingFileRead`, so `bytes_read` reflects total scan I/O. - `build_parquet_reader()`: shared internals for reader construction and metadata loading. **`FileScanTaskReader` struct (refactor)** - Extracted `process_file_scan_task`'s parameters into a `Clone` struct with a `process(self, task)` method, resolving a `clippy::too_many_arguments` violation. Struct and impl are co-located. **Re-exports** - `ScanMetrics` and `ScanResult` re-exported from `iceberg::arrow` and `iceberg::scan`. `test_scan_metrics_bytes_read` in `reader.rs`: asserts `bytes_read() == 0` before stream consumption (the stream is lazy) and `bytes_read() > 0` after. `test_scan_metrics_includes_delete_file_bytes`: reads the same data file with and without a positional delete file and asserts `bytes_read` is strictly greater when deletes are present. All existing reader and scan tests pass (updated to use `ScanResult::stream()`). --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: blackmwk <liurenjie1024@outlook.com> (cherry picked from commit 1ad4bfd)
Which issue does this PR close?
What changes are included in this PR?
Add always-on per-scan I/O metrics to
ArrowReader.Motivation: Downstream engines need per-scan byte counts for their UIs. For example, DataFusion Comet uses this to populate
bytes_scannedon its Iceberg scan operator, which flows through to Spark UI viaTaskMetrics.inputMetrics.setBytesRead(). This must be per-scan, not global. Concurrent scans against the sameFileIOneed independent counters. The approach matches DataFusion's pattern of wrappingAsyncFileReaderwith a counting layer and is storage-backend agnostic.ArrowReader::read()now returnsScanResultScanResultwraps the record batch stream andScanMetrics. Accessors:stream(),metrics(),into_parts().fetch_add(Relaxed)per I/O request, negligible overhead.read()call, so cloned readers get independent metrics.New file:
crates/iceberg/src/arrow/scan_metrics.rsCountingFileRead<F: FileRead>: generic wrapper that increments a sharedAtomicU64on eachread().ScanMetrics: public handle exposingbytes_read().ScanResult: public struct returned byArrowReader::read().FileReadblanket impl forBox<dyn FileRead>CountingFileRead<F>to wrap the boxed reader returned byFileIO::reader().Single
open_parquet_filewith countingopen_parquet_filewrapped withCountingFileRead, sobytes_readreflects total scan I/O.build_parquet_reader(): shared internals for reader construction and metadata loading.FileScanTaskReaderstruct (refactor)process_file_scan_task's parameters into aClonestruct with aprocess(self, task)method, resolving aclippy::too_many_argumentsviolation. Struct and impl are co-located.Re-exports
ScanMetricsandScanResultre-exported fromiceberg::arrowandiceberg::scan.Are these changes tested?
test_scan_metrics_bytes_readinreader.rs: assertsbytes_read() == 0before stream consumption (the stream is lazy) andbytes_read() > 0after.test_scan_metrics_includes_delete_file_bytes: reads the same data file with and without a positional delete file and assertsbytes_readis strictly greater when deletes are present. All existing reader and scan tests pass (updated to useScanResult::stream()).