Skip to content

True parallelism#1871

Closed
vustef wants to merge 23 commits intoapache:mainfrom
RelationalAI:vs-parallelism
Closed

True parallelism#1871
vustef wants to merge 23 commits intoapache:mainfrom
RelationalAI:vs-parallelism

Conversation

@vustef
Copy link
Copy Markdown

@vustef vustef commented Nov 18, 2025

Which issue does this PR close?

https://relationalai.atlassian.net/browse/RAI-44217

What changes are included in this PR?

Adding true parallelism here. By default, full scan (in reader.rs) has only concurrency, but not parallelism. For the negative impact of that, see this closed draft's description: #1684. While upstream the design is such that parallelism should be baked in the upper layers, since we diverge at the moment, we may do this hack to enable parallelism for us right away.
What are the issues?
When FileScanTaskStream is processed, we process it concurrently, however, without spawning, there's no parallelism. The impact of not spawning for each stream item here is minimal though, as operations are IO-heavy, and concurrency is nearly enough. However, the output of processing each file in the file stream is a record batch stream. And processing record batch stream is CPU-heavy operation. Right now we process that concurrently as well (with try_flatten_unordered(N)), but that is not the enough - for CPU-heavy work we definitely need parallelism.

So what can we do?
First, we can create a channel, spawn, and return receiver side of the channel. In the spawned task, we can populate the transmitter side. Here we have two options:

  1. Spawn for each file
  2. Don't spawn, since these are IO-bound operations.
    I chose to spawn, with the idea of squeezing parallelism. In some cases it's going to add more latency though, and we may make this an option (or decide for different default here in the PR).
    Then for each file, we need to process batches in the record_batch_stream. Since this is happening in the spawned task already (if we choose option 2 above, we should at least spawn around processing record_batch_stream), CPU-heavy operation will be parallelized. But if we only have one file, processing its batches won't be parallelized. So again we have two strategies:
  3. Iterate through batches without spawning per batch
  4. Spawn per batch
    I chose to spawn per batch in this PR.

For incremental streams, we already spawn for each file. However, we don't spawn per batch. To align these two, I'm modifying that code to spawn per batch too.

Are these changes tested?

Existing tests go through these code paths. Haven't tested performance yet, as that is a manual process on EC2 instances.

gbrgr and others added 23 commits October 14, 2025 12:30
* WIP, initial draft of incremental scan

* .

* .

* cargo fmt

* Implement unzipped stream

* Remove printlns

* Add API method for unzipped stream

* .

* Remove comment

* Rename var

* Add import

* Measure time

* Fix typo

* Undo some changes

* Change type name

* Add comment header

* Fail when encountering equality deletes

* Add comments

* Add some preliminary tests

* Format

* Remove playground

* Add more tests

* Clippy

* .

* .

* Adapt tests

* .

* Add test

* Add tests

* Add tests

* Format

* Add test

* Format

* .

* Rm newline

* Rename trait function

* Reuse schema

* .

* remove clone

* Add test for adding file_path column

* Make `from_snapshot` mandatory

* Error out if incremental scan encounters neither Append nor Delete

* .

* Add materialized variant of add_file_path_column

* .

* Allow dead code

* Some PR comments

* .

* More PR comments

* .

* Add comments

* Avoid cloning

* Add reference to PR

* Some PR comments

* .

* format

* Allow overwrite operation for now

* Fix file_path column

* Add overwrite test

* Unwrap delete vector

* .

* Add assertion

* Avoid cloning the mutex guard

* Abort when encountering a deleted delete file

* Adjust comment

* Update crates/iceberg/src/arrow/reader.rs

Co-authored-by: Vukasin Stefanovic <vukasin.stefanovic92@gmail.com>

* Add check

* Update crates/iceberg/src/scan/incremental/mod.rs

---------

Co-authored-by: Vukasin Stefanovic <vukasin.stefanovic92@gmail.com>
* WIP, initial draft of incremental scan

* .

* .

* cargo fmt

* Implement unzipped stream

* Remove printlns

* Add API method for unzipped stream

* .

* Remove comment

* Rename var

* Add import

* Measure time

* Fix typo

* Undo some changes

* Change type name

* Add comment header

* Fail when encountering equality deletes

* Add comments

* Add some preliminary tests

* Format

* Remove playground

* Add more tests

* Clippy

* .

* .

* Adapt tests

* .

* Add test

* Add tests

* Add tests

* Format

* Add test

* Format

* .

* Rm newline

* Rename trait function

* Reuse schema

* .

* remove clone

* Add test for adding file_path column

* Make `from_snapshot` mandatory

* Error out if incremental scan encounters neither Append nor Delete

* .

* Add materialized variant of add_file_path_column

* .

* Allow dead code

* Some PR comments

* .

* More PR comments

* .

* Add comments

* Avoid cloning

* Add reference to PR

* Some PR comments

* .

* format

* Allow overwrite operation for now

* Fix file_path column

* Add overwrite test

* Unwrap delete vector

* .

* Add assertion

* Avoid cloning the mutex guard

* Abort when encountering a deleted delete file

* Adjust comment

* Update crates/iceberg/src/arrow/reader.rs

Co-authored-by: Vukasin Stefanovic <vukasin.stefanovic92@gmail.com>

* Add check

* Update crates/iceberg/src/scan/incremental/mod.rs

---------

Co-authored-by: Vukasin Stefanovic <vukasin.stefanovic92@gmail.com>
* fix(reader): fix position delete bugs with row group skipping (apache#1806)

## Which issue does this PR close?

Partially address apache#1749.

## What changes are included in this PR?

This PR fixes two related correctness bugs in
`ArrowReader::build_deletes_row_selection()` where position deletes
targeting rows in skipped or skipped-to row groups were not being
applied correctly.

### Background: How These Bugs Were Discovered

While running Apache Spark + Apache Iceberg integration tests through
DataFusion Comet, we discovered that the following tests were failing or
hanging:
- org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
- org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
- org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate

Investigation revealed that recent work to support Iceberg's file
splitting feature (via `filter_row_groups_by_byte_range()`) exposed
latent bugs in the position delete logic. While the byte range filtering
code itself is correct, it exercises code paths that were previously
untested, revealing these pre-existing issues.

#### Bug 1: Missing base index increment when skipping row groups

**The Issue:**

When processing a Parquet file with multiple row groups, if a position
delete targets a row in a later row group, the function would skip row
groups without deletes but fail to increment
`current_row_group_base_idx`. This caused the row index tracking to
become desynchronized.

**Example scenario:**
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
- Position delete targets row 199 (last row in group 1)
- When processing group 0: delete (199) is beyond the group's range, so
code hits `continue` at lines 469-471
- BUG: `current_row_group_base_idx` is NOT incremented, stays at 0
- When processing group 1: code thinks rows start at 0 instead of 100
- Delete at position 199 is never applied (thinks file only has rows
0-99)

**The Fix:**

Add `current_row_group_base_idx += row_group_num_rows` before the two
`continue` statements at lines ~470 and ~481. This ensures row index
tracking stays synchronized when skipping row groups.

#### Bug 2: Stale cached delete index when skipping unselected row
groups

**The Issue:**

When row group selection is active (e.g., via byte range filtering for
file splits) and an unselected row group is skipped, the cached
`next_deleted_row_idx_opt` variable can become stale, leading to either
lost deletes or infinite loops depending on the scenario.

The function maintains a cached value (`next_deleted_row_idx_opt`)
containing the next delete to apply. When skipping unselected row
groups, it calls
`delete_vector_iter.advance_to(next_row_group_base_idx)` to position the
iterator, but this doesn't automatically update the cached variable.

**Two problematic scenarios:**

1. Stale cache causes infinite loop (the bug we hit):
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
  - Position delete at row 0 (in group 0)
  - Row group selection: read ONLY group 1
  - Initial state: `next_deleted_row_idx_opt = Some(0)` (cached)
  - Skip group 0: `advance_to(100)` positions iterator past delete at 0
  - BUG: cached value still `Some(0)` - STALE!
- Process group 1: loop condition `0 < 200` is `true`, but `current_idx
(100) != next_deleted_row_idx (0)`, so neither branch executes could
result in infinite loop
2. Unconditionally calling `next()` loses deletes:
- File with 2 row groups: rows 0-99 (group 0) and rows 100-199 (group 1)
  - Position delete at row 199 (in group 1)
  - Row group selection: read ONLY group 1
- Initial state: `next_deleted_row_idx_opt = Some(199)` (cached, already
correct!)
- Skip group 0: `advance_to(100)` - iterator already positioned
correctly
  - If we call `next()`: BUG - consumes delete at 199, advancing past it
  - Process group 1: iterator exhausted, delete is lost

**The Fix:**

- If `cached value < next_row_group_base_idx` (stale), update it, thus
avoiding infinite loop
- If `cached value >= next_row_group_base_idx` (still valid), keep it,
thus preserving delete

## Are these changes tested?

Yes. This PR adds two comprehensive unit tests in reader.rs:

1. `test_position_delete_across_multiple_row_groups` - Tests bug 1
(missing base index increment)
2. `test_position_delete_with_row_group_selection` - Tests bug 2
scenario where delete is in selected group
3. `test_position_delete_in_skipped_row_group` - Tests bug 2 scenario
where delete is in skipped group (would hang without fix)

Additionally, these fixes resolve failures in Iceberg Java's
spark-extension tests when running with DataFusion Comet’s PR
apache/datafusion-comet#2528:
- org.apache.iceberg.spark.extensions.TestMergeOnReadMerge
- org.apache.iceberg.spark.extensions.TestMergeOnReadDelete
- org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate

* feat(datafusion): implement the partitioning node for DataFusion to define the partitioning (apache#1620)

## Which issue does this PR close?
- Closes apache#1543

## What changes are included in this PR?
Implement a physical execution repartition node that determines the
relevant DataFusion partitioning strategy based on the Iceberg table
schema and metadata.
 1. Unpartitioned tables: Uses round-robin partitioning 
 2. Partitioned tables: It depends on the transform type:
- Identity or Bucket transforms: Uses hash partitioning on the
_partition column
- Temporal transforms (Year, Month, Day, Hour): Uses round-robin
partitioning

_Minor change: I created a new `schema_ref()` helper method._

## Are these changes tested?
Yes, with unit tests

---------

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>

* feat(reader): Date32 from days since epoch for Literal:try_from_json (apache#1803)

* chore(deps): Bump aws-sdk-glue from 1.125.0 to 1.126.0 (apache#1812)

Bumps [aws-sdk-glue](https://github.com/awslabs/aws-sdk-rust) from
1.125.0 to 1.126.0.
<details>
<summary>Commits</summary>
<ul>
<li>See full diff in <a
href="https://github.com/awslabs/aws-sdk-rust/commits">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=aws-sdk-glue&package-manager=cargo&previous-version=1.125.0&new-version=1.126.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): Bump astral-sh/setup-uv from 6 to 7 (apache#1811)

Bumps [astral-sh/setup-uv](https://github.com/astral-sh/setup-uv) from 6
to 7.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/astral-sh/setup-uv/releases">astral-sh/setup-uv's
releases</a>.</em></p>
<blockquote>
<h2>v7.0.0 🌈 node24 and a lot of bugfixes</h2>
<h2>Changes</h2>
<p>This release comes with a load of bug fixes and a speed up. Because
of switching from node20 to node24 it is also a breaking change. If you
are running on GitHub hosted runners this will just work, if you are
using self-hosted runners make sure, that your runners are up to date.
If you followed the normal installation instructions your self-hosted
runner will keep itself updated.</p>
<p>This release also removes the deprecated input
<code>server-url</code> which was used to download uv releases from a
different server.
The <a
href="https://github.com/astral-sh/setup-uv?tab=readme-ov-file#manifest-file">manifest-file</a>
input supersedes that functionality by adding a flexible way to define
available versions and where they should be downloaded from.</p>
<h3>Fixes</h3>
<ul>
<li>The action now respects when the environment variable
<code>UV_CACHE_DIR</code> is already set and does not overwrite it. It
now also finds <a
href="https://docs.astral.sh/uv/reference/settings/#cache-dir">cache-dir</a>
settings in config files if you set them.</li>
<li>Some users encountered problems that <a
href="https://github.com/astral-sh/setup-uv?tab=readme-ov-file#disable-cache-pruning">cache
pruning</a> took forever because they had some <code>uv</code> processes
running in the background. Starting with uv version <code>0.8.24</code>
this action uses <code>uv cache prune --ci --force</code> to ignore the
running processes</li>
<li>If you just want to install uv but not have it available in path,
this action now respects <code>UV_NO_MODIFY_PATH</code></li>
<li>Some other actions also set the env var <code>UV_CACHE_DIR</code>.
This action can now deal with that but as this could lead to unwanted
behavior in some edgecases a warning is now displayed.</li>
</ul>
<h3>Improvements</h3>
<p>If you are using minimum version specifiers for the version of uv to
install for example</p>
<pre lang="toml"><code>[tool.uv]
required-version = &quot;&gt;=0.8.17&quot;
</code></pre>
<p>This action now detects that and directly uses the latest version.
Previously it would download all available releases from the uv repo
to determine the highest matching candidate for the version specifier,
which took much more time.</p>
<p>If you are using other specifiers like <code>0.8.x</code> this action
still needs to download all available releases because the specifier
defines an upper bound (not 0.9.0 or later) and &quot;latest&quot; would
possibly not satisfy that.</p>
<h2>🚨 Breaking changes</h2>
<ul>
<li>Use node24 instead of node20 <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/608">#608</a>)</li>
<li>Remove deprecated input server-url <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/607">#607</a>)</li>
</ul>
<h2>🐛 Bug fixes</h2>
<ul>
<li>Respect UV_CACHE_DIR and cache-dir <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/612">#612</a>)</li>
<li>Use --force when pruning cache <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/611">#611</a>)</li>
<li>Respect UV_NO_MODIFY_PATH <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/603">#603</a>)</li>
<li>Warn when <code>UV_CACHE_DIR</code> has changed <a
href="https://github.com/jamesbraza"><code>@​jamesbraza</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/601">#601</a>)</li>
</ul>
<h2>🚀 Enhancements</h2>
<ul>
<li>Shortcut to latest version for minimum version specifier <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/598">#598</a>)</li>
</ul>
<h2>🧰 Maintenance</h2>
<ul>
<li>Bump dependencies <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/613">#613</a>)</li>
<li>Fix test-uv-no-modify-path <a
href="https://github.com/eifinger"><code>@​eifinger</code></a> (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/604">#604</a>)</li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/85856786d1ce8acfbcc2f13a5f3fbd6b938f9f41"><code>8585678</code></a>
Bump dependencies (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/664">#664</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/22d500a65c34c827ebc95094040adcee254e92fa"><code>22d500a</code></a>
Bump github/codeql-action from 4.30.8 to 4.30.9 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/652">#652</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/14d557131df7147286f7ce93a5b6b0189f8e1bc3"><code>14d5571</code></a>
chore: update known checksums for 0.9.5 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/663">#663</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/29cd2350cd44d155ed44d0eba0e1b63d07fb3b69"><code>29cd235</code></a>
Use tar for extracting the uv zip file on Windows too (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/660">#660</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/2ddd2b9cb38ad8efd50337e8ab201519a34c9f24"><code>2ddd2b9</code></a>
chore: update known checksums for 0.9.4 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/651">#651</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/b7bf78939d77607a9ccb489e4ec4651ba1092d5c"><code>b7bf789</code></a>
Fix &quot;lowest&quot; resolution strategy with lower-bound only (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/649">#649</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/cb6c0a53d9c61608defba05145184489d20183b2"><code>cb6c0a5</code></a>
Change version in docs to v7 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/647">#647</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/dffc6292f2060d80116faf1baee66598a67f042c"><code>dffc629</code></a>
Use working-directory to detect empty workdir (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/645">#645</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/6e346e1653b720be5aaa194026b82bdef65869c7"><code>6e346e1</code></a>
chore: update known checksums for 0.9.3 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/644">#644</a>)</li>
<li><a
href="https://github.com/astral-sh/setup-uv/commit/3ccd0fd498ef6303a98d4125859aae05eedf6294"><code>3ccd0fd</code></a>
Bump github/codeql-action from 4.30.7 to 4.30.8 (<a
href="https://redirect.github.com/astral-sh/setup-uv/issues/639">#639</a>)</li>
<li>Additional commits viewable in <a
href="https://github.com/astral-sh/setup-uv/compare/v6...v7">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=astral-sh/setup-uv&package-manager=github_actions&previous-version=6&new-version=7)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>

* chore(deps): Bump crate-ci/typos from 1.38.1 to 1.39.0 (apache#1810)

---------

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Matt Butrovich <mbutrovich@users.noreply.github.com>
Co-authored-by: Florian Valeye <florian.valeye@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
* Remove TODO

* Implement delete files

* PR comments

* Fix test comments

* Comment fix

* Fix upstream changes

* Modify test

* format
* Add support for replace

* Add positional delete test, refactor match arm
* Make from and to in incremental scan optional

* Make from inclusive

* Fix merge conflicts

* .

* Format
@vustef vustef closed this Nov 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants