Skip to content

Charlesantoine.leger/feat/partitioned scan hash bucketing contribution#12

Closed
toutane wants to merge 2 commits intobranch-0.9from
charlesantoine.leger/feat/partitioned-scan-hash-bucketing-contribution
Closed

Charlesantoine.leger/feat/partitioned scan hash bucketing contribution#12
toutane wants to merge 2 commits intobranch-0.9from
charlesantoine.leger/feat/partitioned-scan-hash-bucketing-contribution

Conversation

@toutane
Copy link
Copy Markdown

@toutane toutane commented Apr 28, 2026

Which issue does this PR close?

  • Closes #.

What changes are included in this PR?

Are these changes tested?

toutane and others added 2 commits April 28, 2026 14:29
…itionedScan

Introduce two new types enabling DataFusion to schedule each Iceberg
data file as a separate partition, so the runtime executes file reads
in parallel.

TableScan::to_arrow_with_tasks(tasks) is extracted from to_arrow() as
a companion method that accepts a pre-computed FileScanTaskStream.
to_arrow() now delegates to it. This decouples planning from reading:
IcebergPartitionedScan can replay a single pre-planned task per
execute() call without re-invoking plan_files() on every partition.

IcebergPartitionedScan (ExecutionPlan leaf):
- Holds a Vec<FileScanTask> collected at planning time.
- Advertises UnknownPartitioning(n) with n = number of tasks.
- execute(partition) rebuilds a TableScan (schema resolution only,
  no I/O) and calls to_arrow_with_tasks with the single task for
  that partition, inheriting the same Arrow reader settings as
  IcebergTableScan (row-group filtering, row selection, batch size,
  concurrency limit).

IcebergPartitionedTableProvider (TableProvider):
- Reloads the table from the catalog on every scan() call to guarantee
  freshness against concurrent writers.
- Calls plan_files() eagerly at scan time, collects all FileScanTasks,
  and hands them to IcebergPartitionedScan.
- Supports Inexact filter pushdown and column projection.
- Rejects write operations with FeatureUnsupported.

Tests verify the two invariants the hash-bucketing layer (next commit)
relies on: an empty table yields a zero-partition scan, and n data
files yield exactly n DataFusion partitions.
…identity-hash partitioning

Replace the one-task-per-partition layout in IcebergPartitionedScan with
N buckets sized from the session's target_partitions. When the table's
default spec exposes identity-transform columns and every task carries
the corresponding partition values, tasks are bucketed by hashing those
values via DataFusion's REPARTITION_RANDOM_STATE so the resulting
partitioning matches what RepartitionExec would produce. The scan then
declares Partitioning::Hash(exprs, N), letting downstream joins and
aggregates skip an extra repartition.

Hash declaration is conservative and only stands when:
  - the table has a single partition spec (no spec evolution)
  - every identity source column is present in the output projection
  - every column type is supported by literal_to_array
  - every task supplied a full identity key
Any miss collapses to UnknownPartitioning(N) while bucketing falls
back to a hash of data_file_path so partitions still distribute.

IcebergPartitionedScan now stores Vec<Vec<FileScanTask>> and execute(i)
streams every task in buckets[i] through to_arrow_with_tasks. Bucket
count is capped at min(target_partitions, num_files), and an empty
table still yields zero partitions to avoid out-of-bounds execute calls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit 67d05e0)
@toutane toutane closed this Apr 28, 2026
@toutane toutane deleted the charlesantoine.leger/feat/partitioned-scan-hash-bucketing-contribution branch April 28, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants