Skip to content

feat: globally sort shared FileStream work queue on Inexact sort pushdown#60

Draft
adriangb wants to merge 1 commit intomainfrom
sort-pushdown-inexact-global
Draft

feat: globally sort shared FileStream work queue on Inexact sort pushdown#60
adriangb wants to merge 1 commit intomainfrom
sort-pushdown-inexact-global

Conversation

@adriangb
Copy link
Copy Markdown
Member

Which issue does this PR close?

  • Closes #.

Rationale for this change

After apache#21182 sorts files per-group and apache#21351 lets partitions share a single work queue via SharedWorkSource, the per-group order is lost at the queue level — each group's files land as a contiguous block regardless of cross-group stats. For Inexact pushdown (the common case for TopK DESC with a dynamic filter) we want workers pulling files in globally best-first order so the filter threshold tightens fastest across the whole scan.

Today's flow (Inexact, SharedWorkSource active):

G0 (sorted within): [lo_max, hi_max]      ┐
G1 (sorted within): [lo_max, hi_max]      ├─ flat_map → queue = [G0_lo, G0_hi, G1_lo, G1_hi]
...                                       ┘

Worker 0 gets G0_lo (whatever its max is — not necessarily the global best). TopK's dynamic filter tightens based on whatever max the first-completed scan happens to produce, not the global max.

What changes are included in this PR?

Carry a work_order_hint: Option<LexOrdering> on FileScanConfig — a desired-but-not-required ordering populated by the Inexact sort-pushdown branches (in rebuild_with_source and try_sort_file_groups_by_statistics). SharedWorkSource::from_config reads the hint and seeds the queue via a new sort_files_globally_by_statistics helper that flattens all files, builds one MinMaxStatistics, and orders by min value. Falls back to today's flat_map(FileGroup::iter) order when no hint is set or stats are unusable.

file_groups composition is untouched, so output_partitioning() stays stable — parallelism is preserved.

The hint is skipped on the reverse-scan path (reverse_file_groups = true): the per-group reversal must not be overridden by a globally ascending sort. A globally descending variant for that path is a possible follow-up.

Files changed:

  • datafusion/datasource/src/file_scan_config/mod.rs — new work_order_hint field, threaded through builder, From<FileScanConfig> and build().
  • datafusion/datasource/src/file_scan_config/sort_pushdown.rs — new sort_files_globally_by_statistics helper; hint populated in both Inexact branches.
  • datafusion/datasource/src/file_stream/work_source.rsSharedWorkSource::from_config consumes the hint; new unit tests for queue seeding.

Are these changes tested?

Yes:

  • sort_pushdown_inexact_populates_work_order_hint — interleaved-stats multi-group Inexact scan; asserts the hint matches the requested ordering and partition count is preserved.
  • sort_pushdown_rebuild_reverse_leaves_hint_none — reverse-scan path leaves work_order_hint = None.
  • from_config_without_hint_preserves_flat_group_order — no-hint case matches today's queue order exactly.
  • from_config_with_hint_seeds_globally_sorted_queue — queue is drained in global min-ascending order across groups, not group-concatenation order.
  • from_config_falls_back_when_stats_missing — any file without stats falls back to flat order without panicking.

All 142 datafusion-datasource lib tests pass. Clippy clean on datafusion-datasource --all-targets --all-features.

Broader tests (full workspace clippy, physical-optimizer integration, sort_pushdown.slt) deferred to CI — local disk was exhausted by prior worktree build caches and the worktree target couldn't finish compiling the SLT binary. No changes outside datafusion-datasource so the blast radius is narrow.

Are there any user-facing changes?

No. work_order_hint is pub(crate) and purely an internal scheduling hint; no API changes. Existing behaviour is preserved when the hint is None.

…down

After apache#21182 sorts files per-group and apache#21351 lets partitions share a
single work queue via `SharedWorkSource`, the per-group order is lost at
the queue level — each group's files land as a contiguous block regardless
of cross-group stats. For Inexact pushdown (e.g. DESC TopK with a dynamic
filter) we want workers pulling files in globally best-first order so the
filter threshold tightens fastest across the whole scan.

Approach: carry a `work_order_hint: Option<LexOrdering>` on
`FileScanConfig` — a "desired-but-not-required" ordering populated by the
Inexact sort-pushdown branches. `SharedWorkSource::from_config` reads it
and seeds the queue via a new `sort_files_globally_by_statistics` helper
that flattens all files, builds one `MinMaxStatistics`, and orders by
min value. Falls back to today's flat `flat_map(FileGroup::iter)` order
when no hint is set or stats are unusable. `file_groups` composition is
untouched, so `output_partitioning()` stays stable.

Skipped on the reverse-scan path (`reverse_file_groups = true`): the
per-group reversal must not be overridden by a globally ascending sort.
A globally descending variant is a follow-up.

Tests cover hint population on both Inexact branches, the reverse-scan
leave-none case, and `SharedWorkSource::from_config` honouring the hint,
falling back on missing stats, and preserving today's behaviour when no
hint is present.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant