File-parallel pipeline with nested scan API and with_file_prefetch_depth!#92
Open
robertbuessow wants to merge 14 commits intomainfrom
Open
File-parallel pipeline with nested scan API and with_file_prefetch_depth!#92robertbuessow wants to merge 14 commits intomainfrom
robertbuessow wants to merge 14 commits intomainfrom
Conversation
Replace the to_arrow() code path with a custom file-parallel pipeline that processes N parquet files concurrently while preserving strict file-then-row ordering for Julia consumption. Architecture: - plan_files() provides an ordered list of FileScanTasks - Each file task runs as a background tokio task with its own mpsc channel and per-file Semaphore(100MB) for backpressure - FuturesOrdered yields per-file receivers in file order; the consumer drains file 0 batch-by-batch, then file 1, etc. - Each file uses the same iceberg-rs ArrowReader code path as before Changes: - New: ordered_file_pipeline.rs (~420 lines incl. profiling stats) - full.rs: IcebergScan gains file_io, batch_size, file_concurrency fields; iceberg_arrow_stream uses plan_files() + pipeline - scan_common.rs / incremental.rs: pass through new fields in macros - Temporary PipelineStats with FFI print_summary for benchmarking Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Exposes a new iceberg_file_scan_stream / scan_nested! API that yields one IcebergFileScan per file, each carrying the filename, record count, and a prefetched inner IcebergArrowStream of Arrow IPC batches. The existing flat scan! path is kept by re-implementing it as a thin flattening wrapper over the same nested pipeline, so all ordering, backpressure, and stats logic lives in one place. Key changes: - ordered_file_pipeline: add FileScan (internal type), make_file_stream, spawn_file_task_with_meta, run_nested / create_nested_pipeline; rewrite create_pipeline to flatten the nested pipeline. Switch run_nested from FuturesOrdered to FuturesUnordered (spawn futures resolve immediately so ordering is irrelevant here). - table.rs: add IcebergFileScan (#[repr(C)]), IcebergFileScanStream, IcebergFileScanResponse; add iceberg_file_scan_free, iceberg_file_scan_stream_free, iceberg_file_scan_record_count, iceberg_file_scan_filename (sync getters), iceberg_next_file_scan (async). - full.rs: add iceberg_file_scan_stream export op; make IcebergScan.file_io non-optional (FileIO instead of Option<FileIO>) — always set at construction, use .clone() in stream ops instead of .take(). - Julia: scan_nested!, nested_arrow_stream, next_file_scan, file_scan_record_count, file_scan_filename, file_scan_arrow_stream, free_file_scan!, free_file_scan_stream! exported from RustyIceberg. Labels: dismiss-release-notes build:benchmark Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Replace all inline iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...) callsites with a crate-wide pub(crate) helper in lib.rs, removing the boilerplate from catalog.rs, lib.rs, and ordered_file_pipeline.rs. Labels: dismiss-release-notes build:benchmark Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add add_elapsed, store_elapsed, and async timed methods to PipelineStats to eliminate the repeated start/record/elapsed boilerplate. The three async phases (fetch+decode, serialize, semaphore) now read as single timed() expressions; the sync reader-setup phase uses add_elapsed directly. Labels: dismiss-release-notes build:benchmark Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Tests (test/scan_tests.jl):
- Basic iteration: verify each FileScan has a non-empty .parquet filename,
record_count > 0, and the nations table totals exactly 25 records.
- Row count consistency: nested and flat pipelines over the customer table
must produce identical total row counts.
- Correct data: nested scan over nations produces the same rows as the
existing flat-scan reference test.
- Builder methods: select_columns! and with_batch_size! are respected
per-batch in the nested path.
- Safe early drop: abandon a FileScan mid-stream and drop the outer stream
early; background file tasks see the dropped receiver and exit cleanly.
Also fixes the timed! macro call for the serialize phase: the block form
{ expr.await.map_err(...)? } was incorrectly evaluating to () before the
macro's own .await ran. The .map_err chains now appear outside the macro
invocation where they belong.
Labels: dismiss-release-notes build:benchmark
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ll time PipelineStats unit tests (12 tests): - reset_clears_every_field, task_start/end concurrency tracking, peak high-water mark, buffer add/release/peak, add_elapsed accumulation, store_elapsed overwrite, field isolation. Dispatch backpressure metric: - Rename dead consumer_wait_ns → file_dispatch_wait_ns. Track time run_nested spends blocked on the outer channel (slow consumer) using timed!(file_dispatch_wait_ns, tx.send(...)). Shown as "dispatch stall" in print_summary. Nested pipeline wall time: - run_nested now records pipeline_wall_ns when it finishes dispatching all FileScan handles. For the flat path run_flat overwrites it with the full end-to-end time; for the nested path this gives a non-zero throughput in print_summary instead of always showing 0. Labels: dismiss-release-notes build:benchmark Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Adds a new scan configuration parameter that controls how many completed FileScan items are queued in the outer channel of the nested pipeline, decoupling file-I/O concurrency from how far ahead Rust prefetches for the Julia consumer. Previously the outer channel capacity was hardcoded to `file_concurrency`, coupling the two. Now they are independent: `file_concurrency` sets how many files are processed in parallel, while `file_prefetch_depth` controls how many ready FileScan items can buffer up before run_nested blocks. When `file_prefetch_depth` is 0 (default), it falls back to `file_concurrency` preserving existing behaviour. Changes: - IcebergScan/IcebergIncrementalScan: add file_prefetch_depth field - scan_common.rs: propagate field through all struct-reconstructing macros; add impl_with_file_prefetch_depth! macro - full.rs: wire iceberg_scan_with_file_prefetch_depth FFI function; resolve prefetch_depth in both iceberg_arrow_stream and iceberg_file_scan_stream - ordered_file_pipeline: add prefetch_depth parameter to create_nested_pipeline and create_pipeline; use it for the outer FileScan channel capacity - Julia: with_file_prefetch_depth! was already defined; move export to correct line Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Extract resolve_pipeline_params() in full.rs to deduplicate the concurrency/prefetch_depth/file_io/batch_size resolution logic shared by iceberg_arrow_stream and iceberg_file_scan_stream sync blocks - Update pipeline_stats.rs header comment to remove "temporary / will be removed" framing — the module is intended to stay Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Runs the Rust unit test suite on every PR and push to main, alongside the existing format check. Uses rust-cache to keep incremental build times low. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
to_arrow()call with a custom file-parallel pipeline (ordered_file_pipeline.rs) that processes N files concurrently while yielding batches in strict file-then-row ordericeberg_file_scan_stream/scan_nested!) that exposes oneFileScanper file — each carrying the filename, record count, and a prefetched inner batch stream — so Julia callers can handle per-file logic without reimplementing the pipelinewith_file_prefetch_depth!to independently control how many completedFileScanitems queue in the outer channel, decoupling prefetch buffering from file concurrencypipeline_statsfor timing and throughput observability across pipeline phases (reader setup, fetch/decode, serialize, backpressure)cargo testto CI so the Rust unit test suite runs on every PRTest plan
make run-containersthenmake test— full Julia test suite passes on Julia 1.10 and 1.11cargo testiniceberg_rust_ffi/passescargo fmt --checkpasseswith_file_prefetch_depth!(scan, UInt(4))sets the outer channel depth independently ofwith_data_file_concurrency_limit!scan_nested!yields oneFileScanper file in order, with correct filename and record count🤖 Generated with Claude Code