Skip to content

Conversation

@EnyMan
Copy link

@EnyMan EnyMan commented Jan 22, 2026

Summary

This PR improves the performance of the upsert() operation, particularly for large upserts with 10,000+ rows. The changes address three main bottlenecks in the current implementation.

Problem

The current upsert implementation has several performance issues that become significant with large datasets:

  1. Expensive match filter generation: For composite keys, create_match_filter() generates Or(And(EqualTo, EqualTo), ...) expressions - one And clause per unique key combination. With 10k+ rows, this creates big expression trees that are slow to evaluate, up to n*m leaves for a single key column.

  2. Per-batch insert filtering: The insert logic filters rows using expression evaluation (expression_to_pyarrow) on each batch, which is inefficient and doesn't leverage PyArrow's join capabilities.

  3. Row-by-row comparison: get_rows_to_update() uses Python loops to compare rows one at a time (source_table.slice(source_idx, 1)), missing the opportunity for vectorized operations.

Solution

1. Coarse Match Filter for Initial Scan (Biggest Performance Win)

Added create_coarse_match_filter() that generates a less precise but much faster filter for the initial table scan. This is where the majority of the performance improvement comes from.

  • For small datasets (< 10,000 unique keys): Uses And(In(col1, values), In(col2, values)) instead of exact key-tuple matching
  • For large datasets with dense numeric keys (>10% density): Uses range filters (col >= min AND col <= max)
  • For large datasets with sparse keys: Returns AlwaysTrue() to allow full scan (exact matching happens downstream anyway)

This is safe because exact key matching occurs in get_rows_to_update() via the join operation.

Key insight - AlwaysTrue() is where the biggest win happens:

The benchmark data was sparse, triggering the AlwaysTrue() path. Counter-intuitively, this is actually the best case for performance improvement. The speedup doesn't come from reading fewer files - it comes from avoiding the massive expression tree construction and evaluation:

  • Original: Build Or(And(...), And(...), ...) with millions of nodes (8s), then evaluate during scan (382s)
  • Optimized: Return AlwaysTrue() instantly (0.07s), scan without filter overhead (1.4s)

With sparse data, you'd read most/all files anyway, so the "full scan" isn't a penalty - but avoiding an expression tree with n×m nodes (n keys × m columns) and evaluating it across f files is a huge win.

When this optimization helps less:

  • Small datasets (< 10k keys): Already fast with In() predicates, minimal improvement
  • Dense numeric keys: Range filter helps, but less dramatic than the sparse case
  • Workloads where the original filter already performed well

2. Anti-Join for Insert Filtering

Replaced per-batch expression filtering with a single anti-join operation after processing all batches:

# Before: Per-batch expression filtering (slow)
for batch in matched_iceberg_record_batches:
    expr_match = create_match_filter(rows, join_cols)
    expr_match_arrow = expression_to_pyarrow(bind(...))
    rows_to_insert = rows_to_insert.filter(~expr_match_arrow)

# After: Single anti-join (fast)
combined_matched_keys = pa.concat_tables(matched_target_keys).group_by(join_cols).aggregate([])
rows_to_insert = df_keys.join(combined_matched_keys, keys=join_cols, join_type="left anti")

Note on memory usage: The new approach accumulates matched keys in memory during batch processing. We only store key columns (not full rows) to minimize memory footprint, and deduplicate after the loop. For tables with millions of matching rows, this could increase peak memory usage compared to the previous approach. A potential future improvement would be incremental deduplication during the loop.

3. Vectorized Row Comparison

Replaced row-by-row Python comparison with vectorized PyArrow operations:

# Before: Python loop with slice()
for source_idx, target_idx in zip(...):
    source_row = source_table.slice(source_idx, 1)
    target_row = target_table.slice(target_idx, 1)
    for key in non_key_cols:
        if source_row.column(key)[0].as_py() != target_row.column(key)[0].as_py():
            to_update_indices.append(source_idx)

# After: Vectorized with take() and compute
matched_source = source_table.take(source_indices)
matched_target = target_table.take(target_indices)
for col in non_key_cols:
    col_diff = _compare_columns_vectorized(source_col, target_col)
    diff_masks.append(col_diff)
combined_mask = functools.reduce(pc.or_, diff_masks)

The _compare_columns_vectorized() function handles:

  • Primitive types: Uses pc.not_equal() with proper null handling
  • Struct types: Recursively compares each nested field
  • List/Map types: Falls back to Python comparison (still batched)

Benchmark Results

Ran benchmarks on a table with ~2M rows, doing incremental upserts:

Run Table Size Batches Original Optimized Speedup
2 2M rows 32 11.9 min 2.3 min 5.1x
3 2M rows 96 31.5 min 3.9 min 8.0x
4 2M rows 160+ 51.2 min 5.5 min 9.3x

Why times increase with each run: The table uses bucketing, and each upsert modifies files independently, causing file count to increase over time. The original implementation's big filter expression (Or(And(...), ...)) had to be evaluated against every file, so more files = dramatically more time. The optimized version avoids this by using AlwaysTrue(), making the scan time grow linearly with data size rather than exponentially with file count.

This file increase could be mitigated with table maintenance (compaction), which is not yet implemented in PyIceberg.

Where the Time Went (Run 2: 2M rows, 32 batches)

Step Original Optimized
Filter creation 7.9s 0.07s (114x faster)
Table scan 382.2s 1.4s (273x faster)
Batch processing 212.0s 24.5s
Insert filtering (included above) 0.2s

The coarse filter approach shows the biggest improvement:

  • Original filter complexity: Or(And(...), And(...), ...) with millions of nodes
  • Optimized filter: AlwaysTrue() or simple And(In(), In())

Incremental Adoption

If the anti-join change is concerning due to memory implications, the coarse match filter optimization can be contributed separately as it provides the majority of the performance benefit and doesn't change the memory characteristics.

Suggested PR split:

  1. Coarse match filter for initial scan (biggest win, minimal risk)
  2. Vectorized row comparison in get_rows_to_update()
  3. Anti-join for insert filtering

Future Considerations

Why Rust bindings weren't explored for this PR:

In ticket #2159, a suggestion was made to side-step performance issues by using the Python binding of the rust implementation. However, we would like to stick with a Python-centric implementation, because our use case requires mocking datetime using time-machine for:

  • Creating historical backfills with accurate snapshot timestamps
  • Deterministically rerunning failed pipeline runs with the same timestamps

This is why I kept the implementation in pure Python rather than exploring Rust bindings.

Potential hybrid approach:

The data processing (filtering, joins, comparisons) is where most of the time is spent and could benefit from Rust bindings. However - and I'll be selfish here - snapshot creation and metadata operations should remain in Python to preserve the ability to mock time. Without this, our backfill and replay workflows would break.

A future optimization could:

  • Move scan filtering and row comparison to Rust for performance
  • Keep snapshot/commit operations in Python for datetime mocking flexibility

I'd happily trade some performance for keeping our time-mocking capability intact.

Testing

Added comprehensive tests for:

  • create_coarse_match_filter() behavior across different dataset sizes and types
  • Threshold boundary conditions (< 10k, = 10k, > 10k unique keys)
  • Density calculations for range filter decisions
  • _compare_columns_vectorized() with primitives, nulls, structs, nested structs, and lists
  • Edge cases: empty datasets, single values, negative numbers, composite keys

Breaking Changes

None. The API remains unchanged; this is purely an internal optimization.

Files Changed

  • pyiceberg/table/__init__.py - upsert method optimizations
  • pyiceberg/table/upsert_util.py - new create_coarse_match_filter(), vectorized comparison functions
  • tests/table/test_upsert.py - new tests for optimization functions

Note: This code was co-written with the help of an AI agent (Claude), primarily to speed up exploration and understanding of the PyIceberg codebase. All the speed up ideas are mine. The benchmark results are from our real-world production data that we actively use and store. I have reviewed all the generated code. All related tests pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant