Skip to content

True parallelism across full and incremental scan#11

Merged
vustef merged 13 commits intomainfrom
vs-parallelism
Nov 19, 2025
Merged

True parallelism across full and incremental scan#11
vustef merged 13 commits intomainfrom
vs-parallelism

Conversation

@vustef
Copy link
Copy Markdown
Collaborator

@vustef vustef commented Nov 18, 2025

Which issue does this PR close?

https://relationalai.atlassian.net/browse/RAI-44217

What changes are included in this PR?

Adding true parallelism here. By default, full scan (in reader.rs) has only concurrency, but not parallelism. For the negative impact of that, see this closed draft's description: apache#1684. While upstream the design is such that parallelism should be baked in the upper layers, since we diverge at the moment, we may do this hack to enable parallelism for us right away.
What are the issues?
When FileScanTaskStream is processed, we process it concurrently, however, without spawning, there's no parallelism. The impact of not spawning for each stream item here is minimal though, as operations are IO-heavy, and concurrency is nearly enough. However, the output of processing each file in the file stream is a record batch stream. And processing record batch stream is CPU-heavy operation. Right now we process that concurrently as well (with try_flatten_unordered(N)), but that is not the enough - for CPU-heavy work we definitely need parallelism.

So what can we do?
First, we can create a channel, spawn, and return receiver side of the channel. In the spawned task, we can populate the transmitter side. Here we have two options:

  1. Spawn for each file
  2. Don't spawn, since these are IO-bound operations.
    I chose to spawn, with the idea of squeezing parallelism. In some cases it's going to add more latency though, and we may make this an option (or decide for different default here in the PR).

Then for each file, we need to process batches in the record_batch_stream. Since this is happening in the spawned task already (if we choose option 2 above, we should at least spawn around processing record_batch_stream), CPU-heavy operation will be parallelized. But if we only have one file, processing its batches won't be parallelized. For that we'd need to poll from record batch stream in parallel, which is not possible (we would have to use lower level parquet API, which is out of scope for now).

For incremental streams, we already spawn for each file. So I'm just refactoring a bit to reuse code.

In addition, per batch spawn is spawn_blocking.

Both implementations now provide two-level parallelism:

  1. Outer spawn: Background coordination
  2. File-level parallelism: N files processed in parallel

Batch-level parallelism is not implemented.

Are these changes tested?

Existing tests go through these code paths. Haven't tested performance yet, as that is a manual process on EC2 instances.

@vustef vustef requested a review from gbrgr November 18, 2025 14:51
@vustef vustef marked this pull request as ready for review November 18, 2025 19:20
Comment thread crates/iceberg/src/arrow/reader.rs Outdated
@vustef vustef enabled auto-merge (squash) November 19, 2025 10:36
@vustef
Copy link
Copy Markdown
Collaborator Author

vustef commented Nov 19, 2025

In future, we may parallelize this per row group (at least when it comes to decoding, not IO) with next_row_group API on ParquetRecordBatchStream.

@vustef vustef merged commit ae83309 into main Nov 19, 2025
18 checks passed
@vustef vustef deleted the vs-parallelism branch November 19, 2025 10:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants