Skip to content

Add nested file-scan pipeline (stream of per-file streams)#91

Open
robertbuessow wants to merge 14 commits intomainfrom
rb-split-scan-2
Open

Add nested file-scan pipeline (stream of per-file streams)#91
robertbuessow wants to merge 14 commits intomainfrom
rb-split-scan-2

Conversation

@robertbuessow
Copy link
Copy Markdown

@robertbuessow robertbuessow commented Apr 29, 2026

Summary

  • Nested pipeline API: scan_nested! / iceberg_file_scan_stream returns an outer FileScanStream that yields one IcebergFileScan per file. Each scan carries the filename, record count, and a prefetched inner IcebergArrowStream of Arrow IPC batches. The existing flat scan! is re-implemented as a thin flattening wrapper over the same nested pipeline so all ordering, backpressure, and stats logic lives in one place.
  • Backwards compatible: all existing scan! call sites and tests are unchanged.
  • IcebergScan.file_io made non-optional: was always set at construction; removing Option<> enforces this statically and eliminates dead .take().ok_or_else(...) guards in both stream ops.
  • unexpected() crate-wide helper: replaces all inline iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...) in catalog.rs, lib.rs, and ordered_file_pipeline.rs.
  • timed! macro: replaces the let start / .await / add_elapsed pattern for the three async phases (fetch+decode, serialize, semaphore). Block form timed!(field, { expr }) keeps the important computation visually prominent.
  • PipelineStats improvements: add_elapsed / store_elapsed helpers; file_dispatch_wait_ns metric tracks outer-channel backpressure (time run_nested is blocked waiting for the consumer to call next_file_scan); pipeline_wall_ns is now also set for the nested path (previously always 0, breaking the throughput display).
  • 12 Rust unit tests for all PipelineStats methods.
  • Julia tests: 6 new @testset blocks covering basic iteration, nested_arrow_stream called directly, row-count consistency vs flat scan, correct data, builder-method compatibility, and safe early drop.

New Julia API

# Returns a FileScanStream — one FileScan per file
stream = scan_nested!(scan)                  # or nested_arrow_stream(scan) after build!

fs_ptr = next_file_scan(stream)              # Ptr{Cvoid}, C_NULL when done
while fs_ptr != C_NULL
    fname  = file_scan_filename(fs_ptr)      # String
    nrec   = file_scan_record_count(fs_ptr)  # Int64 (before deletes)
    inner  = file_scan_arrow_stream(fs_ptr)  # borrowed ArrowStream — do NOT free_stream
    batch  = next_batch(inner)
    while batch != C_NULL
        # ... process Arrow IPC batch ...
        free_batch(batch)
        batch = next_batch(inner)
    end
    free_file_scan!(fs_ptr)                  # frees filename + inner stream
    fs_ptr = next_file_scan(stream)
end
free_file_scan_stream!(stream)

Ownership rules

  • file_scan_arrow_stream returns a borrowed pointer into the IcebergFileScan struct. Callers must not call free_stream on it — free_file_scan! handles cleanup.
  • Dropping a FileScan without reading all inner batches is safe: the dropped receiver causes the background file task to exit cleanly.
  • Dropping the FileScanStream without consuming all files is equally safe.

Test plan

  • make test passes (all existing tests unchanged)
  • New nested-API tests pass (6 testsets)
  • cargo test passes (12 PipelineStats unit tests)

Based on @hall-alex sorted iterator change.
Overarching task tracking Iceberg-load optimizations: RAI-49519.

🤖 Generated with Claude Code

hall-alex and others added 9 commits April 6, 2026 16:33
Replace the to_arrow() code path with a custom file-parallel pipeline
that processes N parquet files concurrently while preserving strict
file-then-row ordering for Julia consumption.

Architecture:
- plan_files() provides an ordered list of FileScanTasks
- Each file task runs as a background tokio task with its own mpsc
  channel and per-file Semaphore(100MB) for backpressure
- FuturesOrdered yields per-file receivers in file order; the consumer
  drains file 0 batch-by-batch, then file 1, etc.
- Each file uses the same iceberg-rs ArrowReader code path as before

Changes:
- New: ordered_file_pipeline.rs (~420 lines incl. profiling stats)
- full.rs: IcebergScan gains file_io, batch_size, file_concurrency
  fields; iceberg_arrow_stream uses plan_files() + pipeline
- scan_common.rs / incremental.rs: pass through new fields in macros
- Temporary PipelineStats with FFI print_summary for benchmarking

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Exposes a new iceberg_file_scan_stream / scan_nested! API that yields one
IcebergFileScan per file, each carrying the filename, record count, and a
prefetched inner IcebergArrowStream of Arrow IPC batches. The existing flat
scan! path is kept by re-implementing it as a thin flattening wrapper over
the same nested pipeline, so all ordering, backpressure, and stats logic
lives in one place.

Key changes:
- ordered_file_pipeline: add FileScan (internal type), make_file_stream,
  spawn_file_task_with_meta, run_nested / create_nested_pipeline; rewrite
  create_pipeline to flatten the nested pipeline. Switch run_nested from
  FuturesOrdered to FuturesUnordered (spawn futures resolve immediately so
  ordering is irrelevant here).
- table.rs: add IcebergFileScan (#[repr(C)]), IcebergFileScanStream,
  IcebergFileScanResponse; add iceberg_file_scan_free,
  iceberg_file_scan_stream_free, iceberg_file_scan_record_count,
  iceberg_file_scan_filename (sync getters), iceberg_next_file_scan (async).
- full.rs: add iceberg_file_scan_stream export op; make IcebergScan.file_io
  non-optional (FileIO instead of Option<FileIO>) — always set at
  construction, use .clone() in stream ops instead of .take().
- Julia: scan_nested!, nested_arrow_stream, next_file_scan,
  file_scan_record_count, file_scan_filename, file_scan_arrow_stream,
  free_file_scan!, free_file_scan_stream! exported from RustyIceberg.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Replace all inline iceberg::Error::new(iceberg::ErrorKind::Unexpected, ...)
callsites with a crate-wide pub(crate) helper in lib.rs, removing the
boilerplate from catalog.rs, lib.rs, and ordered_file_pipeline.rs.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add add_elapsed, store_elapsed, and async timed methods to PipelineStats
to eliminate the repeated start/record/elapsed boilerplate. The three async
phases (fetch+decode, serialize, semaphore) now read as single timed()
expressions; the sync reader-setup phase uses add_elapsed directly.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Tests (test/scan_tests.jl):
- Basic iteration: verify each FileScan has a non-empty .parquet filename,
  record_count > 0, and the nations table totals exactly 25 records.
- Row count consistency: nested and flat pipelines over the customer table
  must produce identical total row counts.
- Correct data: nested scan over nations produces the same rows as the
  existing flat-scan reference test.
- Builder methods: select_columns! and with_batch_size! are respected
  per-batch in the nested path.
- Safe early drop: abandon a FileScan mid-stream and drop the outer stream
  early; background file tasks see the dropped receiver and exit cleanly.

Also fixes the timed! macro call for the serialize phase: the block form
{ expr.await.map_err(...)? } was incorrectly evaluating to () before the
macro's own .await ran. The .map_err chains now appear outside the macro
invocation where they belong.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ll time

PipelineStats unit tests (12 tests):
- reset_clears_every_field, task_start/end concurrency tracking, peak
  high-water mark, buffer add/release/peak, add_elapsed accumulation,
  store_elapsed overwrite, field isolation.

Dispatch backpressure metric:
- Rename dead consumer_wait_ns → file_dispatch_wait_ns. Track time
  run_nested spends blocked on the outer channel (slow consumer) using
  timed!(file_dispatch_wait_ns, tx.send(...)). Shown as "dispatch stall"
  in print_summary.

Nested pipeline wall time:
- run_nested now records pipeline_wall_ns when it finishes dispatching all
  FileScan handles. For the flat path run_flat overwrites it with the full
  end-to-end time; for the nested path this gives a non-zero throughput
  in print_summary instead of always showing 0.

Labels: dismiss-release-notes build:benchmark

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@robertbuessow robertbuessow added enhancement New feature or request rust Pull requests that update rust code labels Apr 29, 2026
robertbuessow and others added 2 commits April 29, 2026 14:57
Co-authored-by: Copilot <copilot@github.com>
@robertbuessow robertbuessow requested a review from hall-alex April 29, 2026 17:39
Adds a new scan configuration parameter that controls how many completed
FileScan items are queued in the outer channel of the nested pipeline,
decoupling file-I/O concurrency from how far ahead Rust prefetches for
the Julia consumer.

Previously the outer channel capacity was hardcoded to `file_concurrency`,
coupling the two. Now they are independent: `file_concurrency` sets how
many files are processed in parallel, while `file_prefetch_depth` controls
how many ready FileScan items can buffer up before run_nested blocks.
When `file_prefetch_depth` is 0 (default), it falls back to `file_concurrency`
preserving existing behaviour.

Changes:
- IcebergScan/IcebergIncrementalScan: add file_prefetch_depth field
- scan_common.rs: propagate field through all struct-reconstructing macros;
  add impl_with_file_prefetch_depth! macro
- full.rs: wire iceberg_scan_with_file_prefetch_depth FFI function; resolve
  prefetch_depth in both iceberg_arrow_stream and iceberg_file_scan_stream
- ordered_file_pipeline: add prefetch_depth parameter to create_nested_pipeline
  and create_pipeline; use it for the outer FileScan channel capacity
- Julia: with_file_prefetch_depth! was already defined; move export to correct line

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@hall-alex hall-alex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool 💪🚀

Comment thread iceberg_rust_ffi/src/full.rs Outdated
Comment on lines +213 to +234
let scan_ptr = unsafe { &mut *scan };
let scan_ref = &scan_ptr.scan;
if scan_ref.is_none() {
return Err(anyhow::anyhow!("Scan not initialized"));
}

let concurrency = scan_ptr.file_concurrency;
let concurrency = if concurrency == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
concurrency
};

let file_io = scan_ptr.file_io.clone();
let batch_size = scan_ptr.batch_size;
let prefetch_depth = if scan_ptr.file_prefetch_depth == 0 {
concurrency
} else {
scan_ptr.file_prefetch_depth
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: would it make sense to extract a helper for this to use here and above (lines 151-174)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — extracted into resolve_pipeline_params in the eaa30d4 commit.

format!("Serialization task panicked: {}", e),
)),
Ok(Err(e)) => Err(unexpected(e)),
Err(e) => Err(unexpected(format!("Serialization task panicked: {e}"))),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So format!("Serialization task panicked: {e}") is the same as format!("Serialization task panicked: {}", e)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're identical at runtime. {e} is Rust 1.58 "captured identifier" syntax — it implicitly captures the local variable e as the format argument, so you don't need to pass it explicitly after the format string. Pure style preference; I tend to use it for single-variable cases to keep the string self-contained.

Comment thread iceberg_rust_ffi/src/pipeline_stats.rs Outdated
@@ -0,0 +1,425 @@
// ===========================================================================
// Temporary profiling — will be removed before merging to production.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps change this comment. Would be nice to keep this for now at least.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in eaa30d4 — the comment now describes the module as permanent pipeline statistics rather than temporary profiling.

- Extract resolve_pipeline_params() in full.rs to deduplicate the
  concurrency/prefetch_depth/file_io/batch_size resolution logic shared
  by iceberg_arrow_stream and iceberg_file_scan_stream sync blocks
- Update pipeline_stats.rs header comment to remove "temporary / will be
  removed" framing — the module is intended to stay

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
// starving the tokio executor.
let serialized = timed!(
serialize_ns,
tokio::task::spawn_blocking(move || crate::serialize_record_batch(batch))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering here whether, as done in the export work, it would make sense to have a thread pool of N workers that do the serialization work idle in the background?

Copy link
Copy Markdown
Author

@robertbuessow robertbuessow Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added a global rayon::ThreadPool

// ===========================================================================
// Pipeline statistics — timing and throughput counters for the file-parallel
// scan pipeline. Global atomics accumulate data across all file tasks and are
// readable from Julia via `@ccall iceberg_print_pipeline_stats()`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we expose that as a Julia function as well, or do you prefer users to use the ccall macro?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added print_pipeline_stats() and reset_pipeline_stats() Julia wrappers in full.jl, both exported from the module.

Replace `tokio::task::spawn_blocking` with a process-global `rayon::ThreadPool`
(sized to available CPU cores) for Arrow IPC serialization. All concurrent file
tasks share the same pool via `SERIALIZE_POOL`, keeping N worker threads warm
for the lifetime of the process rather than borrowing from tokio's unbounded
blocking pool on each batch.

Also add `print_pipeline_stats()` and `reset_pipeline_stats()` Julia wrappers
in `full.jl`, both exported from the module.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 75.67568% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.89%. Comparing base (aaca529) to head (b41d539).
⚠️ Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
src/full.jl 75.67% 9 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #91      +/-   ##
==========================================
+ Coverage   83.86%   83.89%   +0.03%     
==========================================
  Files           9        9              
  Lines         849      913      +64     
==========================================
+ Hits          712      766      +54     
- Misses        137      147      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build:benchmark dismiss-release-notes enhancement New feature or request rust Pull requests that update rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants