From a947f0ae9e0df21c15cf0f142e74b9ef5339ddcd Mon Sep 17 00:00:00 2001 From: EnyMan Date: Mon, 19 Jan 2026 14:22:16 +0100 Subject: [PATCH 1/8] feat: Optimize upsert process with coarse match filter and vectorized comparisons --- pyiceberg/table/__init__.py | 3 +- pyiceberg/table/upsert_util.py | 148 +++++++++++++++++++++----- tests/table/test_upsert.py | 186 +++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 26 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b30a1426e7..34a383a22b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -836,7 +836,8 @@ def upsert( ) # get list of rows that exist so we don't have to load the entire target table - matched_predicate = upsert_util.create_match_filter(df, join_cols) + # Use coarse filter for initial scan - exact matching happens in get_rows_to_update() + matched_predicate = upsert_util.create_coarse_match_filter(df, join_cols) # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 6f32826eb0..66ada9e567 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -16,6 +16,7 @@ # under the License. import functools import operator +from typing import Union import pyarrow as pa from pyarrow import Table as pyarrow_table @@ -31,8 +32,18 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: + """ + Create an Iceberg BooleanExpression filter that exactly matches rows based on join columns. + + For single-column keys, uses an efficient In() predicate. + For composite keys, creates Or(And(...), And(...), ...) for exact row matching. + This function should be used when exact matching is required (e.g., overwrite, insert filtering). + """ unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + if len(unique_keys) == 0: + return AlwaysFalse() + if len(join_cols) == 1: return In(join_cols[0], unique_keys[0].to_pylist()) else: @@ -48,17 +59,97 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre return Or(*filters) +def create_coarse_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: + """ + Create a coarse Iceberg BooleanExpression filter for initial row scanning. + + For single-column keys, uses an efficient In() predicate (exact match). + For composite keys, uses In() per column as a coarse filter (AND of In() predicates), + which may return false positives but is much more efficient than exact matching. + + This function should only be used for initial scans where exact matching happens + downstream (e.g., in get_rows_to_update() via the join operation). + """ + unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + + if len(unique_keys) == 0: + return AlwaysFalse() + + if len(join_cols) == 1: + return In(join_cols[0], unique_keys[0].to_pylist()) + else: + # For composite keys: use In() per column as a coarse filter + # This is more efficient than creating Or(And(...), And(...), ...) for each row + # May include false positives, but fine-grained matching happens downstream + column_filters = [] + for col in join_cols: + unique_values = pc.unique(unique_keys[col]).to_pylist() + column_filters.append(In(col, unique_values)) + return functools.reduce(operator.and_, column_filters) + + def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: """Check for duplicate rows in a PyArrow table based on the join columns.""" return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0 +def _compare_columns_vectorized( + source_col: Union[pa.Array, pa.ChunkedArray], target_col: Union[pa.Array, pa.ChunkedArray] +) -> pa.Array: + """ + Vectorized comparison of two columns, returning a boolean array where True means values differ. + + Handles struct types recursively by comparing each nested field. + Handles null values correctly: null != non-null is True, null == null is True (no update needed). + """ + col_type = source_col.type + + if pa.types.is_struct(col_type): + # PyArrow cannot directly compare struct columns, so we recursively compare each field + diff_masks = [] + for i, field in enumerate(col_type): + src_field = pc.struct_field(source_col, [i]) + tgt_field = pc.struct_field(target_col, [i]) + field_diff = _compare_columns_vectorized(src_field, tgt_field) + diff_masks.append(field_diff) + + if not diff_masks: + # Empty struct - no fields to compare, so no differences + return pa.array([False] * len(source_col), type=pa.bool_()) + + return functools.reduce(pc.or_, diff_masks) + + elif pa.types.is_list(col_type) or pa.types.is_large_list(col_type) or pa.types.is_map(col_type): + # For list/map types, fall back to Python comparison as PyArrow doesn't support vectorized comparison + # This is still faster than the original row-by-row approach since we batch the conversion + source_py = source_col.to_pylist() + target_py = target_col.to_pylist() + return pa.array([s != t for s, t in zip(source_py, target_py, strict=True)], type=pa.bool_()) + + else: + # For primitive types, use vectorized not_equal + # Handle nulls: not_equal returns null when comparing with null + # We need: null vs non-null = different (True), null vs null = same (False) + diff = pc.not_equal(source_col, target_col) + source_null = pc.is_null(source_col) + target_null = pc.is_null(target_col) + + # XOR of null masks: True if exactly one is null (meaning they differ) + null_diff = pc.xor(source_null, target_null) + + # Combine: different if values differ OR exactly one is null + # Fill null comparison results with False (both non-null but comparison returned null shouldn't happen, + # but if it does, treat as no difference) + diff_filled = pc.fill_null(diff, False) + return pc.or_(diff_filled, null_diff) + + def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table: """ Return a table with rows that need to be updated in the target table based on the join columns. + Uses vectorized PyArrow operations for efficient comparison, avoiding row-by-row Python loops. The table is joined on the identifier columns, and then checked if there are any updated rows. - Those are selected and everything is renamed correctly. """ all_columns = set(source_table.column_names) join_cols_set = set(join_cols) @@ -69,13 +160,13 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols raise ValueError("Target table has duplicate rows, aborting upsert") if len(target_table) == 0: - # When the target table is empty, there is nothing to update :) + # When the target table is empty, there is nothing to update + return source_table.schema.empty_table() + + if len(non_key_cols) == 0: + # No non-key columns to compare, all matched rows are "updates" but with no changes return source_table.schema.empty_table() - # We need to compare non_key_cols in Python as PyArrow - # 1. Cannot do a join when non-join columns have complex types - # 2. Cannot compare columns with complex types - # See: https://github.com/apache/arrow/issues/35785 SOURCE_INDEX_COLUMN_NAME = "__source_index" TARGET_INDEX_COLUMN_NAME = "__target_index" @@ -100,25 +191,32 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # Step 3: Perform an inner join to find which rows from source exist in target matching_indices = source_index.join(target_index, keys=list(join_cols_set), join_type="inner") - # Step 4: Compare all rows using Python - to_update_indices = [] - for source_idx, target_idx in zip( - matching_indices[SOURCE_INDEX_COLUMN_NAME].to_pylist(), - matching_indices[TARGET_INDEX_COLUMN_NAME].to_pylist(), - strict=True, - ): - source_row = source_table.slice(source_idx, 1) - target_row = target_table.slice(target_idx, 1) - - for key in non_key_cols: - source_val = source_row.column(key)[0].as_py() - target_val = target_row.column(key)[0].as_py() - if source_val != target_val: - to_update_indices.append(source_idx) - break - - # Step 5: Take rows from source table using the indices and cast to target schema - if to_update_indices: + if len(matching_indices) == 0: + # No matching rows found + return source_table.schema.empty_table() + + # Step 4: Take matched rows in batch (vectorized - single operation) + source_indices = matching_indices[SOURCE_INDEX_COLUMN_NAME] + target_indices = matching_indices[TARGET_INDEX_COLUMN_NAME] + + matched_source = source_table.take(source_indices) + matched_target = target_table.take(target_indices) + + # Step 5: Vectorized comparison per column + diff_masks = [] + for col in non_key_cols: + source_col = matched_source.column(col) + target_col = matched_target.column(col) + col_diff = _compare_columns_vectorized(source_col, target_col) + diff_masks.append(col_diff) + + # Step 6: Combine masks with OR (any column different = needs update) + combined_mask = functools.reduce(pc.or_, diff_masks) + + # Step 7: Filter to get indices of rows that need updating + to_update_indices = pc.filter(source_indices, combined_mask) + + if len(to_update_indices) > 0: return source_table.take(to_update_indices) else: return source_table.schema.empty_table() diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 9bc61799e4..1682af3b06 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -885,3 +885,189 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None: for snapshot in snapshots[initial_snapshot_count:]: assert snapshot.summary is not None assert snapshot.summary.additional_properties.get("test_prop") == "test_value" + + +def test_coarse_match_filter_composite_key() -> None: + """ + Test that create_coarse_match_filter produces efficient In() predicates for composite keys. + """ + from pyiceberg.table.upsert_util import create_coarse_match_filter, create_match_filter + + # Create a table with composite key that has overlapping values + # (1, 'x'), (2, 'y'), (1, 'z') - exact filter should have 3 conditions + # coarse filter should have In(a, [1,2]) AND In(b, ['x','y','z']) + data = [ + {"a": 1, "b": "x", "val": 1}, + {"a": 2, "b": "y", "val": 2}, + {"a": 1, "b": "z", "val": 3}, + ] + schema = pa.schema([pa.field("a", pa.int32()), pa.field("b", pa.string()), pa.field("val", pa.int32())]) + table = pa.Table.from_pylist(data, schema=schema) + + exact_filter = create_match_filter(table, ["a", "b"]) + coarse_filter = create_coarse_match_filter(table, ["a", "b"]) + + # Exact filter is an Or of And conditions + assert "Or" in str(exact_filter) + + # Coarse filter is an And of In conditions + assert "And" in str(coarse_filter) + assert "In" in str(coarse_filter) + + +def test_vectorized_comparison_primitives() -> None: + """Test vectorized comparison with primitive types.""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + # Test integers + source = pa.array([1, 2, 3, 4]) + target = pa.array([1, 2, 5, 4]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False, True, False] + + # Test strings + source = pa.array(["a", "b", "c"]) + target = pa.array(["a", "x", "c"]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # Test floats + source = pa.array([1.0, 2.5, 3.0]) + target = pa.array([1.0, 2.5, 3.1]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False, True] + + +def test_vectorized_comparison_nulls() -> None: + """Test vectorized comparison handles nulls correctly.""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + # null vs non-null = different + source = pa.array([1, None, 3]) + target = pa.array([1, 2, 3]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # non-null vs null = different + source = pa.array([1, 2, 3]) + target = pa.array([1, None, 3]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # null vs null = same (no update needed) + source = pa.array([1, None, 3]) + target = pa.array([1, None, 3]) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False, False] + + +def test_vectorized_comparison_structs() -> None: + """Test vectorized comparison with nested struct types.""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + struct_type = pa.struct([("x", pa.int32()), ("y", pa.string())]) + + # Same structs + source = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "b"}], type=struct_type) + target = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "b"}], type=struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False] + + # Different struct values + source = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "b"}], type=struct_type) + target = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "c"}], type=struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True] + + +def test_vectorized_comparison_nested_structs() -> None: + """Test vectorized comparison with deeply nested struct types.""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + inner_struct = pa.struct([("val", pa.int32())]) + outer_struct = pa.struct([("inner", inner_struct), ("name", pa.string())]) + + source = pa.array( + [{"inner": {"val": 1}, "name": "a"}, {"inner": {"val": 2}, "name": "b"}], + type=outer_struct, + ) + target = pa.array( + [{"inner": {"val": 1}, "name": "a"}, {"inner": {"val": 3}, "name": "b"}], + type=outer_struct, + ) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True] + + +def test_vectorized_comparison_lists() -> None: + """Test vectorized comparison with list types (falls back to Python comparison).""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + list_type = pa.list_(pa.int32()) + + source = pa.array([[1, 2], [3, 4]], type=list_type) + target = pa.array([[1, 2], [3, 5]], type=list_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True] + + +def test_get_rows_to_update_no_non_key_cols() -> None: + """Test get_rows_to_update when all columns are key columns.""" + from pyiceberg.table.upsert_util import get_rows_to_update + + # All columns are key columns, so no non-key columns to compare + source = pa.Table.from_pydict({"id": [1, 2, 3]}) + target = pa.Table.from_pydict({"id": [1, 2, 3]}) + rows = get_rows_to_update(source, target, ["id"]) + assert len(rows) == 0 + + +def test_upsert_with_list_field(catalog: Catalog) -> None: + """Test upsert with list type as non-key column.""" + from pyiceberg.types import ListType + + identifier = "default.test_upsert_with_list_field" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField( + 2, + "tags", + ListType(element_id=3, element_type=StringType(), element_required=False), + required=False, + ), + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field("tags", pa.list_(pa.large_string()), nullable=True), + ] + ) + + initial_data = pa.Table.from_pylist( + [ + {"id": 1, "tags": ["a", "b"]}, + {"id": 2, "tags": ["c"]}, + ], + schema=arrow_schema, + ) + tbl.append(initial_data) + + # Update with changed list + update_data = pa.Table.from_pylist( + [ + {"id": 1, "tags": ["a", "b"]}, # Same - no update + {"id": 2, "tags": ["c", "d"]}, # Changed - should update + {"id": 3, "tags": ["e"]}, # New - should insert + ], + schema=arrow_schema, + ) + + res = tbl.upsert(update_data, join_cols=["id"]) + assert res.rows_updated == 1 + assert res.rows_inserted == 1 From 4ab5c936f9ee13d64bb64ce1c6ddbe7baad4da83 Mon Sep 17 00:00:00 2001 From: EnyMan Date: Mon, 19 Jan 2026 16:02:21 +0100 Subject: [PATCH 2/8] feat: Enhance vectorized comparison to handle struct-level nulls and empty structs --- pyiceberg/table/upsert_util.py | 18 ++++++++---- tests/table/test_upsert.py | 50 ++++++++++++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 66ada9e567..ef2661e49f 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -16,7 +16,6 @@ # under the License. import functools import operator -from typing import Union import pyarrow as pa from pyarrow import Table as pyarrow_table @@ -94,7 +93,7 @@ def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: def _compare_columns_vectorized( - source_col: Union[pa.Array, pa.ChunkedArray], target_col: Union[pa.Array, pa.ChunkedArray] + source_col: pa.Array | pa.ChunkedArray, target_col: pa.Array | pa.ChunkedArray ) -> pa.Array: """ Vectorized comparison of two columns, returning a boolean array where True means values differ. @@ -105,6 +104,11 @@ def _compare_columns_vectorized( col_type = source_col.type if pa.types.is_struct(col_type): + # Handle struct-level nulls first + source_null = pc.is_null(source_col) + target_null = pc.is_null(target_col) + struct_null_diff = pc.xor(source_null, target_null) # Different if exactly one is null + # PyArrow cannot directly compare struct columns, so we recursively compare each field diff_masks = [] for i, field in enumerate(col_type): @@ -114,12 +118,14 @@ def _compare_columns_vectorized( diff_masks.append(field_diff) if not diff_masks: - # Empty struct - no fields to compare, so no differences - return pa.array([False] * len(source_col), type=pa.bool_()) + # Empty struct - only null differences matter + return struct_null_diff - return functools.reduce(pc.or_, diff_masks) + # Combine field differences with struct-level null differences + field_diff = functools.reduce(pc.or_, diff_masks) + return pc.or_(field_diff, struct_null_diff) - elif pa.types.is_list(col_type) or pa.types.is_large_list(col_type) or pa.types.is_map(col_type): + elif pa.types.is_list(col_type) or pa.types.is_large_list(col_type) or pa.types.is_fixed_size_list(col_type) or pa.types.is_map(col_type): # For list/map types, fall back to Python comparison as PyArrow doesn't support vectorized comparison # This is still faster than the original row-by-row approach since we batch the conversion source_py = source_col.to_pylist() diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 1682af3b06..e4b2fd4377 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -892,6 +892,7 @@ def test_coarse_match_filter_composite_key() -> None: Test that create_coarse_match_filter produces efficient In() predicates for composite keys. """ from pyiceberg.table.upsert_util import create_coarse_match_filter, create_match_filter + from pyiceberg.expressions import Or, And, In # Create a table with composite key that has overlapping values # (1, 'x'), (2, 'y'), (1, 'z') - exact filter should have 3 conditions @@ -908,10 +909,10 @@ def test_coarse_match_filter_composite_key() -> None: coarse_filter = create_coarse_match_filter(table, ["a", "b"]) # Exact filter is an Or of And conditions - assert "Or" in str(exact_filter) + assert isinstance(exact_filter, Or) # Coarse filter is an And of In conditions - assert "And" in str(coarse_filter) + assert isinstance(coarse_filter, And) assert "In" in str(coarse_filter) @@ -1071,3 +1072,48 @@ def test_upsert_with_list_field(catalog: Catalog) -> None: res = tbl.upsert(update_data, join_cols=["id"]) assert res.rows_updated == 1 assert res.rows_inserted == 1 + + +def test_vectorized_comparison_struct_level_nulls() -> None: + """Test vectorized comparison handles struct-level nulls correctly (not just field-level nulls).""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + struct_type = pa.struct([("x", pa.int32()), ("y", pa.string())]) + + # null struct vs non-null struct = different + source = pa.array([{"x": 1, "y": "a"}, None, {"x": 3, "y": "c"}], type=struct_type) + target = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "b"}, {"x": 3, "y": "c"}], type=struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # non-null struct vs null struct = different + source = pa.array([{"x": 1, "y": "a"}, {"x": 2, "y": "b"}, {"x": 3, "y": "c"}], type=struct_type) + target = pa.array([{"x": 1, "y": "a"}, None, {"x": 3, "y": "c"}], type=struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # null struct vs null struct = same (no update needed) + source = pa.array([{"x": 1, "y": "a"}, None, {"x": 3, "y": "c"}], type=struct_type) + target = pa.array([{"x": 1, "y": "a"}, None, {"x": 3, "y": "c"}], type=struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False, False] + + +def test_vectorized_comparison_empty_struct_with_nulls() -> None: + """Test that empty structs with null values are compared correctly.""" + from pyiceberg.table.upsert_util import _compare_columns_vectorized + + # Empty struct type - edge case where only struct-level null handling matters + empty_struct_type = pa.struct([]) + + # null vs non-null empty struct = different + source = pa.array([{}, None, {}], type=empty_struct_type) + target = pa.array([{}, {}, {}], type=empty_struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, True, False] + + # null vs null empty struct = same + source = pa.array([None, None], type=empty_struct_type) + target = pa.array([None, None], type=empty_struct_type) + diff = _compare_columns_vectorized(source, target) + assert diff.to_pylist() == [False, False] From db39b67b734b1e83de8753fa4beccd9560500bed Mon Sep 17 00:00:00 2001 From: EnyMan Date: Mon, 19 Jan 2026 20:52:46 +0100 Subject: [PATCH 3/8] feat: Optimize insert filtering in upsert process using anti-join for matched keys --- pyiceberg/table/__init__.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 34a383a22b..0869dbf569 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -804,7 +804,6 @@ def upsert( except ModuleNotFoundError as e: raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e - from pyiceberg.io.pyarrow import expression_to_pyarrow from pyiceberg.table import upsert_util if join_cols is None: @@ -855,7 +854,7 @@ def upsert( batches_to_overwrite = [] overwrite_predicates = [] - rows_to_insert = df + matched_target_keys: list[pa.Table] = [] # Accumulate matched keys for insert filtering for batch in matched_iceberg_record_batches: rows = pa.Table.from_batches([batch]) @@ -873,13 +872,34 @@ def upsert( batches_to_overwrite.append(rows_to_update) overwrite_predicates.append(overwrite_mask_predicate) + # Collect matched keys for insert filtering (will use anti-join after loop) if when_not_matched_insert_all: - expr_match = upsert_util.create_match_filter(rows, join_cols) - expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive) - expr_match_arrow = expression_to_pyarrow(expr_match_bound) + matched_target_keys.append(rows.select(join_cols)) + + batch_loop_end = time.perf_counter() + logger.info( + f"Batch processing: {batch_loop_end - batch_loop_start:.3f}s " + f"({batch_count} batches, get_rows_to_update total: {total_rows_to_update_time:.3f}s)" + ) - # Filter rows per batch. - rows_to_insert = rows_to_insert.filter(~expr_match_arrow) + # Use anti-join to find rows to insert (replaces per-batch expression filtering) + rows_to_insert = df + if when_not_matched_insert_all and matched_target_keys: + filter_start = time.perf_counter() + # Combine all matched keys and deduplicate + combined_matched_keys = pa.concat_tables(matched_target_keys).group_by(join_cols).aggregate([]) + # Cast matched keys to source schema types for join compatibility + source_key_schema = df.select(join_cols).schema + combined_matched_keys = combined_matched_keys.cast(source_key_schema) + # Use anti-join on key columns only (with row indices) to avoid issues with + # struct/list types in non-key columns that PyArrow join doesn't support + row_indices = pa.chunked_array([pa.array(range(len(df)), type=pa.int64())]) + source_keys_with_idx = df.select(join_cols).append_column("__row_idx__", row_indices) + not_matched_keys = source_keys_with_idx.join(combined_matched_keys, keys=join_cols, join_type="left anti") + indices_to_keep = not_matched_keys.column("__row_idx__").combine_chunks() + rows_to_insert = df.take(indices_to_keep) + filter_end = time.perf_counter() + logger.info(f"Insert filtering (anti-join): {filter_end - filter_start:.3f}s ({len(combined_matched_keys)} matched keys)") update_row_cnt = 0 insert_row_cnt = 0 From e36d99460453ee0b124f04a4719cd24673669206 Mon Sep 17 00:00:00 2001 From: EnyMan Date: Tue, 20 Jan 2026 15:04:55 +0100 Subject: [PATCH 4/8] feat: Further optimize the filter for big datasets Remove logging reset pyarrow squash --- pyiceberg/table/__init__.py | 31 +-- pyiceberg/table/upsert_util.py | 171 +++++++++++-- tests/table/test_upsert.py | 448 +++++++++++++++++++++++++++++++++ 3 files changed, 614 insertions(+), 36 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0869dbf569..2688bd5dda 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -834,8 +834,9 @@ def upsert( format_version=self.table_metadata.format_version, ) - # get list of rows that exist so we don't have to load the entire target table - # Use coarse filter for initial scan - exact matching happens in get_rows_to_update() + # Create a coarse filter for the initial scan to reduce the number of rows read. + # This filter is intentionally less precise but faster to evaluate than exact matching. + # Exact key matching happens downstream in get_rows_to_update() via PyArrow joins. matched_predicate = upsert_util.create_coarse_match_filter(df, join_cols) # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. @@ -854,38 +855,32 @@ def upsert( batches_to_overwrite = [] overwrite_predicates = [] - matched_target_keys: list[pa.Table] = [] # Accumulate matched keys for insert filtering + # Accumulate matched keys for anti-join insert filtering after the batch loop. + # We only store key columns (not full rows) to minimize memory usage. + matched_target_keys: list[pa.Table] = [] for batch in matched_iceberg_record_batches: rows = pa.Table.from_batches([batch]) if when_matched_update_all: - # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed - # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed - # this extra step avoids unnecessary IO and writes + # Check non-key columns to see if values have actually changed. + # We don't want to do a blanket overwrite for matched rows if the + # actual non-key column data hasn't changed - this avoids unnecessary IO and writes. rows_to_update = upsert_util.get_rows_to_update(df, rows, join_cols) if len(rows_to_update) > 0: - # build the match predicate filter overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols) - batches_to_overwrite.append(rows_to_update) overwrite_predicates.append(overwrite_mask_predicate) - # Collect matched keys for insert filtering (will use anti-join after loop) if when_not_matched_insert_all: matched_target_keys.append(rows.select(join_cols)) - batch_loop_end = time.perf_counter() - logger.info( - f"Batch processing: {batch_loop_end - batch_loop_start:.3f}s " - f"({batch_count} batches, get_rows_to_update total: {total_rows_to_update_time:.3f}s)" - ) - - # Use anti-join to find rows to insert (replaces per-batch expression filtering) + # Use anti-join to find rows to insert. This is more efficient than per-batch + # expression filtering because: (1) we build expressions once, not per batch, + # and (2) PyArrow joins are faster than evaluating large Or(...) expressions. rows_to_insert = df if when_not_matched_insert_all and matched_target_keys: - filter_start = time.perf_counter() # Combine all matched keys and deduplicate combined_matched_keys = pa.concat_tables(matched_target_keys).group_by(join_cols).aggregate([]) # Cast matched keys to source schema types for join compatibility @@ -898,8 +893,6 @@ def upsert( not_matched_keys = source_keys_with_idx.join(combined_matched_keys, keys=join_cols, join_type="left anti") indices_to_keep = not_matched_keys.column("__row_idx__").combine_chunks() rows_to_insert = df.take(indices_to_keep) - filter_end = time.perf_counter() - logger.info(f"Insert filtering (anti-join): {filter_end - filter_start:.3f}s ({len(combined_matched_keys)} matched keys)") update_row_cnt = 0 insert_row_cnt = 0 diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index ef2661e49f..e5184163fc 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -23,12 +23,24 @@ from pyiceberg.expressions import ( AlwaysFalse, + AlwaysTrue, + And, BooleanExpression, EqualTo, + GreaterThanOrEqual, In, + LessThanOrEqual, Or, ) +# Threshold for switching from In() predicate to range-based or no filter. +# When unique keys exceed this, the In() predicate becomes too expensive to process. +LARGE_FILTER_THRESHOLD = 10_000 + +# Minimum density (ratio of unique values to range size) for range filter to be effective. +# Below this threshold, range filters read too much irrelevant data. +DENSITY_THRESHOLD = 0.1 + def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: """ @@ -58,32 +70,119 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre return Or(*filters) +def _is_numeric_type(arrow_type: pa.DataType) -> bool: + """Check if a PyArrow type is numeric (suitable for range filtering).""" + return pa.types.is_integer(arrow_type) or pa.types.is_floating(arrow_type) + + +def _create_range_filter(col_name: str, values: pa.Array) -> BooleanExpression: + """Create a min/max range filter for a numeric column.""" + min_val = pc.min(values).as_py() + max_val = pc.max(values).as_py() + return And(GreaterThanOrEqual(col_name, min_val), LessThanOrEqual(col_name, max_val)) + + def create_coarse_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: """ Create a coarse Iceberg BooleanExpression filter for initial row scanning. - For single-column keys, uses an efficient In() predicate (exact match). - For composite keys, uses In() per column as a coarse filter (AND of In() predicates), - which may return false positives but is much more efficient than exact matching. + This is an optimization for reducing the scan size before exact matching happens + downstream (e.g., in get_rows_to_update() via the join operation). It trades filter + precision for filter evaluation speed. + + IMPORTANT: This is not a silver bullet optimization. It only helps specific use cases: + - Datasets with < 10,000 unique keys benefit from In() predicates + - Large datasets with dense numeric keys (>10% density) benefit from range filters + - Large datasets with sparse keys or non-numeric columns fall back to full scan + + For small datasets (< LARGE_FILTER_THRESHOLD unique keys, currently 10,000): + - Single-column keys: uses In() predicate + - Composite keys: uses AND of In() predicates per column + + For large datasets (>= LARGE_FILTER_THRESHOLD unique keys): + - Single numeric column with dense IDs (>10% coverage): uses min/max range filter + - Otherwise: returns AlwaysTrue() to skip filtering (full scan) - This function should only be used for initial scans where exact matching happens - downstream (e.g., in get_rows_to_update() via the join operation). + The density threshold (DENSITY_THRESHOLD = 0.1 or 10%) determines whether a range + filter is efficient. Below this threshold, the range would include too many + non-matching rows, making a full scan more practical. + + Args: + df: PyArrow table containing the source data with join columns + join_cols: List of column names to use for matching + + Returns: + BooleanExpression filter for Iceberg table scan """ unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + num_unique_keys = len(unique_keys) - if len(unique_keys) == 0: + if num_unique_keys == 0: return AlwaysFalse() + # For small datasets, use the standard In() approach + if num_unique_keys < LARGE_FILTER_THRESHOLD: + if len(join_cols) == 1: + return In(join_cols[0], unique_keys[0].to_pylist()) + else: + column_filters = [] + for col in join_cols: + unique_values = pc.unique(unique_keys[col]).to_pylist() + column_filters.append(In(col, unique_values)) + if len(column_filters) == 0: + return AlwaysFalse() + if len(column_filters) == 1: + return column_filters[0] + return functools.reduce(operator.and_, column_filters) + + # For large datasets, use optimized strategies if len(join_cols) == 1: - return In(join_cols[0], unique_keys[0].to_pylist()) + col_name = join_cols[0] + col_data = unique_keys[col_name] + col_type = col_data.type + + # For numeric columns, check if range filter is efficient (dense IDs) + if _is_numeric_type(col_type): + min_val = pc.min(col_data).as_py() + max_val = pc.max(col_data).as_py() + value_range = max_val - min_val + 1 + density = num_unique_keys / value_range if value_range > 0 else 0 + + # If IDs are dense (>10% coverage of the range), use range filter + # Otherwise, range filter would read too much irrelevant data + if density > DENSITY_THRESHOLD: + return _create_range_filter(col_name, col_data) + else: + return AlwaysTrue() + else: + # Non-numeric single column with many values - skip filter + return AlwaysTrue() else: - # For composite keys: use In() per column as a coarse filter - # This is more efficient than creating Or(And(...), And(...), ...) for each row - # May include false positives, but fine-grained matching happens downstream + # Composite keys with many values - use range filters for numeric columns where possible column_filters = [] for col in join_cols: - unique_values = pc.unique(unique_keys[col]).to_pylist() - column_filters.append(In(col, unique_values)) + col_data = unique_keys[col] + col_type = col_data.type + unique_values = pc.unique(col_data) + + if _is_numeric_type(col_type) and len(unique_values) >= LARGE_FILTER_THRESHOLD: + # Use range filter for large numeric columns + min_val = pc.min(unique_values).as_py() + max_val = pc.max(unique_values).as_py() + value_range = max_val - min_val + 1 + density = len(unique_values) / value_range if value_range > 0 else 0 + + if density > DENSITY_THRESHOLD: + column_filters.append(_create_range_filter(col, unique_values)) + else: + # Sparse numeric column - still use In() as it's part of composite key + column_filters.append(In(col, unique_values.to_pylist())) + else: + # Small column or non-numeric - use In() + column_filters.append(In(col, unique_values.to_pylist())) + + if len(column_filters) == 0: + return AlwaysTrue() return functools.reduce(operator.and_, column_filters) @@ -98,8 +197,21 @@ def _compare_columns_vectorized( """ Vectorized comparison of two columns, returning a boolean array where True means values differ. - Handles struct types recursively by comparing each nested field. - Handles null values correctly: null != non-null is True, null == null is True (no update needed). + Handles different PyArrow types: + - Primitive types: Uses pc.not_equal() with proper null handling + - Struct types: Recursively compares each nested field + - List/Map types: Falls back to Python comparison (still batched, not row-by-row) + + Null handling semantics: + - null != non-null -> True (values differ, needs update) + - null == null -> False (values same, no update needed) + + Args: + source_col: Column from the source table + target_col: Column from the target table (must have same length) + + Returns: + Boolean PyArrow array where True indicates the values at that index differ """ col_type = source_col.type @@ -155,7 +267,32 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols Return a table with rows that need to be updated in the target table based on the join columns. Uses vectorized PyArrow operations for efficient comparison, avoiding row-by-row Python loops. - The table is joined on the identifier columns, and then checked if there are any updated rows. + The function performs an inner join on the identifier columns, then compares non-key columns + to find rows where values have actually changed. + + Algorithm: + 1. Prepare source and target index tables with row indices + 2. Inner join on join columns to find matching rows + 3. Use take() to extract matched rows in batch + 4. Compare non-key columns using vectorized operations + 5. Filter to rows where at least one non-key column differs + + Note: The column names '__source_index' and '__target_index' are reserved for internal use + and cannot be used as join column names. + + Args: + source_table: PyArrow table with new/updated data + target_table: PyArrow table with existing data + join_cols: List of column names that form the unique key + + Returns: + PyArrow table containing only the rows from source_table that exist in target_table + and have at least one non-key column with a different value. Returns an empty table + if no updates are needed. + + Raises: + ValueError: If target_table has duplicate rows based on join_cols + ValueError: If join_cols contains reserved column names """ all_columns = set(source_table.column_names) join_cols_set = set(join_cols) @@ -183,8 +320,8 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols ) from None # Step 1: Prepare source index with join keys and a marker index - # Cast to target table schema, so we can do the join - # See: https://github.com/apache/arrow/issues/37542 + # Cast source to target schema to ensure type compatibility for the join + # (e.g., source int32 vs target int64 would cause join issues) source_index = ( source_table.cast(target_table.schema) .select(join_cols_set) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index e4b2fd4377..33ea57c3d6 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -1117,3 +1117,451 @@ def test_vectorized_comparison_empty_struct_with_nulls() -> None: target = pa.array([None, None], type=empty_struct_type) diff = _compare_columns_vectorized(source, target) assert diff.to_pylist() == [False, False] + + +# ============================================================================ +# Tests for create_coarse_match_filter and _is_numeric_type +# ============================================================================ + + +@pytest.mark.parametrize( + "dtype,expected_numeric", + [ + (pa.int8(), True), + (pa.int16(), True), + (pa.int32(), True), + (pa.int64(), True), + (pa.uint8(), True), + (pa.uint16(), True), + (pa.uint32(), True), + (pa.uint64(), True), + (pa.float16(), True), + (pa.float32(), True), + (pa.float64(), True), + (pa.string(), False), + (pa.binary(), False), + (pa.date32(), False), + (pa.date64(), False), + (pa.timestamp("us"), False), + (pa.timestamp("ns"), False), + (pa.decimal128(10, 2), False), + (pa.decimal256(20, 4), False), + (pa.bool_(), False), + (pa.large_string(), False), + (pa.large_binary(), False), + ], +) +def test_is_numeric_type(dtype: pa.DataType, expected_numeric: bool) -> None: + """Test that _is_numeric_type correctly identifies all numeric types.""" + from pyiceberg.table.upsert_util import _is_numeric_type + + assert _is_numeric_type(dtype) == expected_numeric + + +# ============================================================================ +# Thresholding Tests (Small vs Large Datasets) +# ============================================================================ + + +def test_coarse_match_filter_small_dataset_uses_in_filter() -> None: + """Test that small datasets (< 10,000 unique keys) use In() filter.""" + from pyiceberg.expressions import In + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create a dataset with 100 unique keys (well below threshold) + num_keys = 100 + data = {"id": list(range(num_keys)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + assert num_keys < LARGE_FILTER_THRESHOLD + assert isinstance(result, In) + assert result.term.name == "id" + assert len(result.literals) == num_keys + + +def test_coarse_match_filter_threshold_boundary_uses_in_filter() -> None: + """Test that datasets at threshold - 1 (9,999 unique keys) still use In() filter.""" + from pyiceberg.expressions import In + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create a dataset with exactly threshold - 1 unique keys + num_keys = LARGE_FILTER_THRESHOLD - 1 + data = {"id": list(range(num_keys)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + assert isinstance(result, In) + assert result.term.name == "id" + assert len(result.literals) == num_keys + + +def test_coarse_match_filter_above_threshold_uses_optimized_filter() -> None: + """Test that datasets >= 10,000 unique keys use optimized filter strategy.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create a dense dataset (consecutive IDs) with exactly threshold unique keys + num_keys = LARGE_FILTER_THRESHOLD + data = {"id": list(range(num_keys)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Dense IDs should use range filter (And of GreaterThanOrEqual and LessThanOrEqual) + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + assert result.left.literal.value == 0 + assert result.right.literal.value == num_keys - 1 + + +def test_coarse_match_filter_large_dataset() -> None: + """Test that large datasets (100,000 unique keys) use optimized filter.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create a dense dataset with 100,000 unique keys + num_keys = 100_000 + data = {"id": list(range(num_keys)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + assert num_keys >= LARGE_FILTER_THRESHOLD + # Dense IDs should use range filter + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + + +# ============================================================================ +# Density Calculation Tests +# ============================================================================ + + +def test_coarse_match_filter_dense_ids_use_range_filter() -> None: + """Test that dense IDs (density > 10%) use range filter.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create dense IDs: all values from 0 to N-1 (100% density) + num_keys = LARGE_FILTER_THRESHOLD + data = {"id": list(range(num_keys)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Density = 10000 / (9999 - 0 + 1) = 100% + # Should use range filter + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + + +def test_coarse_match_filter_moderately_dense_ids_use_range_filter() -> None: + """Test that moderately dense IDs (50% density) use range filter.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create IDs: 0, 2, 4, 6, ... (every other number) - 50% density + num_keys = LARGE_FILTER_THRESHOLD + data = {"id": list(range(0, num_keys * 2, 2)), "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Density = 10000 / (19998 - 0 + 1) ~= 50% + # Should use range filter since density > 10% + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + + +def test_coarse_match_filter_sparse_ids_use_always_true() -> None: + """Test that sparse IDs (density <= 10%) use AlwaysTrue (full scan).""" + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create sparse IDs: values spread across a large range + # 10,000 values in range of ~110,000 = ~9% density + num_keys = LARGE_FILTER_THRESHOLD + ids = list(range(0, num_keys * 11, 11)) # 0, 11, 22, 33, ... + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Density ~= 10000 / ((10000-1)*11 + 1) = 9.09% < 10% + # Should use AlwaysTrue (full scan) + assert isinstance(result, AlwaysTrue) + + +def test_coarse_match_filter_density_boundary_at_10_percent() -> None: + """Test exact 10% boundary density behavior.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create IDs at exactly ~10% density + # 10,000 values in range of 100,000 = exactly 10% + num_keys = LARGE_FILTER_THRESHOLD + # Generate 10,000 values in range [0, 99999] -> density = 10000/100000 = 10% + # Using every 10th value: 0, 10, 20, ... 99990 + ids = list(range(0, num_keys * 10, 10)) + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Density = 10000 / ((num_keys-1)*10 + 1) = 10000 / 99991 ~= 10.001% + # Should use range filter since density > 10% (just barely) + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + + +def test_coarse_match_filter_very_sparse_ids() -> None: + """Test that very sparse IDs (e.g., 1, 1M, 2M) use AlwaysTrue.""" + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create extremely sparse IDs + num_keys = LARGE_FILTER_THRESHOLD + # Values from 0 to (num_keys-1) * 1000, stepping by 1000 + ids = list(range(0, num_keys * 1000, 1000)) + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Density = 10000 / ((10000-1)*1000 + 1) ~= 0.1% + # Should use AlwaysTrue + assert isinstance(result, AlwaysTrue) + + +# ============================================================================ +# Edge Cases +# ============================================================================ + + +def test_coarse_match_filter_empty_dataset_returns_always_false() -> None: + """Test that empty dataset returns AlwaysFalse.""" + from pyiceberg.expressions import AlwaysFalse + + from pyiceberg.table.upsert_util import create_coarse_match_filter + + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict({"id": [], "value": []}, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + assert isinstance(result, AlwaysFalse) + + +def test_coarse_match_filter_single_value_dataset() -> None: + """Test that single value dataset uses In() or EqualTo() with single value.""" + from pyiceberg.expressions import In + + from pyiceberg.table.upsert_util import create_coarse_match_filter + + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict({"id": [42], "value": [1]}, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # PyIceberg may optimize In() with a single value to EqualTo() + if isinstance(result, In): + assert result.term.name == "id" + assert len(result.literals) == 1 + assert result.literals[0].value == 42 + elif isinstance(result, EqualTo): + assert result.term.name == "id" + assert result.literal.value == 42 + else: + pytest.fail(f"Expected In or EqualTo, got {type(result)}") + + +def test_coarse_match_filter_negative_numbers_range() -> None: + """Test that negative number IDs produce correct min/max range.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create dense negative IDs: -10000 to -1 + num_keys = LARGE_FILTER_THRESHOLD + ids = list(range(-num_keys, 0)) + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Should use range filter with negative values + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + assert result.left.literal.value == -num_keys # min + assert result.right.literal.value == -1 # max + + +def test_coarse_match_filter_mixed_sign_numbers_range() -> None: + """Test that mixed sign IDs (-500 to 500) produce correct range spanning zero.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create IDs spanning zero: -5000 to 4999 + num_keys = LARGE_FILTER_THRESHOLD + ids = list(range(-num_keys // 2, num_keys // 2)) + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Should use range filter spanning zero + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + assert result.left.literal.value == -num_keys // 2 # min + assert result.right.literal.value == num_keys // 2 - 1 # max + + +def test_coarse_match_filter_float_range_filter() -> None: + """Test that float IDs use range filter correctly.""" + from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create dense float IDs + num_keys = LARGE_FILTER_THRESHOLD + ids = [float(i) for i in range(num_keys)] + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.float64()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Should use range filter for float column + assert isinstance(result, And) + assert isinstance(result.left, GreaterThanOrEqual) + assert isinstance(result.right, LessThanOrEqual) + assert result.left.literal.value == 0.0 + assert result.right.literal.value == float(num_keys - 1) + + +def test_coarse_match_filter_non_numeric_column_skips_range_filter() -> None: + """Test that non-numeric column with >10k values returns AlwaysTrue.""" + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create string IDs (non-numeric) with many unique values + num_keys = LARGE_FILTER_THRESHOLD + ids = [f"id_{i:05d}" for i in range(num_keys)] + data = {"id": ids, "value": list(range(num_keys))} + schema = pa.schema([pa.field("id", pa.string()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["id"]) + + # Non-numeric column with large dataset should use AlwaysTrue + assert isinstance(result, AlwaysTrue) + + +# ============================================================================ +# Composite Key Tests +# ============================================================================ + + +def test_coarse_match_filter_composite_key_small_dataset() -> None: + """Test that composite key with small dataset uses And(In(), In()).""" + from pyiceberg.expressions import In + + from pyiceberg.table.upsert_util import create_coarse_match_filter + + # Create a small dataset with composite key + data = { + "a": [1, 2, 3, 1, 2, 3], + "b": ["x", "x", "x", "y", "y", "y"], + "value": [1, 2, 3, 4, 5, 6], + } + schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.string()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["a", "b"]) + + # Should be And(In(a), In(b)) + assert isinstance(result, And) + # Check that both children are In() filters + assert "In" in str(result) + + +def test_coarse_match_filter_composite_key_large_numeric_column() -> None: + """Test composite key where one column has >10k unique numeric values.""" + from pyiceberg.expressions import GreaterThanOrEqual, In, LessThanOrEqual + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create dataset with one large dense numeric column and one small column + num_keys = LARGE_FILTER_THRESHOLD + data = { + "a": list(range(num_keys)), # 10k unique dense values + "b": ["category_1"] * (num_keys // 2) + ["category_2"] * (num_keys // 2), # 2 unique values + "value": list(range(num_keys)), + } + schema = pa.schema([pa.field("a", pa.int64()), pa.field("b", pa.string()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["a", "b"]) + + # Should be And of filters for both columns + assert isinstance(result, And) + # Column 'a' (large, dense, numeric) should use range filter + # Column 'b' (small) should use In() + result_str = str(result) + assert "GreaterThanOrEqual" in result_str or "In" in result_str + + +def test_coarse_match_filter_composite_key_mixed_types() -> None: + """Test composite key with mixed numeric and string columns with large dataset.""" + from pyiceberg.expressions import In + + from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter + + # Create dataset with large sparse numeric column and large string column + num_keys = LARGE_FILTER_THRESHOLD + # Sparse numeric IDs + ids = list(range(0, num_keys * 100, 100)) + # Many unique strings + strings = [f"str_{i}" for i in range(num_keys)] + data = { + "numeric_id": ids, + "string_id": strings, + "value": list(range(num_keys)), + } + schema = pa.schema([pa.field("numeric_id", pa.int64()), pa.field("string_id", pa.string()), pa.field("value", pa.int64())]) + table = pa.Table.from_pydict(data, schema=schema) + + result = create_coarse_match_filter(table, ["numeric_id", "string_id"]) + + # Both columns have large unique values + # numeric_id is sparse (density < 10%), so should use In() + # string_id is non-numeric, so should use In() + assert isinstance(result, And) + + From 902fdc3e82b4ea1d8ab55e386c022364f71deb64 Mon Sep 17 00:00:00 2001 From: EnyMan Date: Thu, 22 Jan 2026 23:23:22 +0100 Subject: [PATCH 5/8] refactor: Improve code readability with consistent formatting --- pyiceberg/table/__init__.py | 15 ++++++++---- pyiceberg/table/upsert_util.py | 13 ++++++---- tests/table/test_upsert.py | 44 +++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2688bd5dda..1499c8960e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1799,16 +1799,20 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: ... + def plan_files(self) -> Iterable[ScanTask]: + ... @abstractmethod - def to_arrow(self) -> pa.Table: ... + def to_arrow(self) -> pa.Table: + ... @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + ... @abstractmethod - def to_polars(self) -> pl.DataFrame: ... + def to_polars(self) -> pl.DataFrame: + ... def update(self: S, **overrides: Any) -> S: """Create a copy of this table scan with updated fields.""" @@ -1841,7 +1845,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: return self.update(case_sensitive=case_sensitive) @abstractmethod - def count(self) -> int: ... + def count(self) -> int: + ... class ScanTask: diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index e5184163fc..06e635d64d 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -191,9 +191,7 @@ def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0 -def _compare_columns_vectorized( - source_col: pa.Array | pa.ChunkedArray, target_col: pa.Array | pa.ChunkedArray -) -> pa.Array: +def _compare_columns_vectorized(source_col: pa.Array | pa.ChunkedArray, target_col: pa.Array | pa.ChunkedArray) -> pa.Array: """ Vectorized comparison of two columns, returning a boolean array where True means values differ. @@ -223,7 +221,7 @@ def _compare_columns_vectorized( # PyArrow cannot directly compare struct columns, so we recursively compare each field diff_masks = [] - for i, field in enumerate(col_type): + for i, _field in enumerate(col_type): src_field = pc.struct_field(source_col, [i]) tgt_field = pc.struct_field(target_col, [i]) field_diff = _compare_columns_vectorized(src_field, tgt_field) @@ -237,7 +235,12 @@ def _compare_columns_vectorized( field_diff = functools.reduce(pc.or_, diff_masks) return pc.or_(field_diff, struct_null_diff) - elif pa.types.is_list(col_type) or pa.types.is_large_list(col_type) or pa.types.is_fixed_size_list(col_type) or pa.types.is_map(col_type): + elif ( + pa.types.is_list(col_type) + or pa.types.is_large_list(col_type) + or pa.types.is_fixed_size_list(col_type) + or pa.types.is_map(col_type) + ): # For list/map types, fall back to Python comparison as PyArrow doesn't support vectorized comparison # This is still faster than the original row-by-row approach since we batch the conversion source_py = source_col.to_pylist() diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 33ea57c3d6..2211c6e329 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -100,12 +100,14 @@ def gen_target_iceberg_table( ) -> Table: additional_columns = ", t.order_id + 1000 as order_line_id" if composite_key else "" - df = ctx.sql(f""" + df = ctx.sql( + f""" with t as (SELECT unnest(range({start_row},{end_row + 1})) as order_id) SELECT t.order_id {additional_columns} , date '2021-01-01' as order_date, 'A' as order_type from t - """).to_arrow_table() + """ + ).to_arrow_table() table = catalog.create_table(identifier, df.schema) @@ -166,23 +168,27 @@ def test_merge_scenario_skip_upd_row(catalog: Catalog) -> None: ctx = SessionContext() - df = ctx.sql(""" + df = ctx.sql( + """ select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type union all select 2 as order_id, date '2021-01-01' as order_date, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() table = catalog.create_table(identifier, df.schema) table.append(df) - source_df = ctx.sql(""" + source_df = ctx.sql( + """ select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type union all select 2 as order_id, date '2021-01-01' as order_date, 'B' as order_type union all select 3 as order_id, date '2021-01-01' as order_date, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() res = table.upsert(df=source_df, join_cols=["order_id"]) @@ -202,23 +208,27 @@ def test_merge_scenario_date_as_key(catalog: Catalog) -> None: identifier = "default.test_merge_scenario_date_as_key" _drop_table(catalog, identifier) - df = ctx.sql(""" + df = ctx.sql( + """ select date '2021-01-01' as order_date, 'A' as order_type union all select date '2021-01-02' as order_date, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() table = catalog.create_table(identifier, df.schema) table.append(df) - source_df = ctx.sql(""" + source_df = ctx.sql( + """ select date '2021-01-01' as order_date, 'A' as order_type union all select date '2021-01-02' as order_date, 'B' as order_type union all select date '2021-01-03' as order_date, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() res = table.upsert(df=source_df, join_cols=["order_date"]) @@ -238,23 +248,27 @@ def test_merge_scenario_string_as_key(catalog: Catalog) -> None: ctx = SessionContext() - df = ctx.sql(""" + df = ctx.sql( + """ select 'abc' as order_id, 'A' as order_type union all select 'def' as order_id, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() table = catalog.create_table(identifier, df.schema) table.append(df) - source_df = ctx.sql(""" + source_df = ctx.sql( + """ select 'abc' as order_id, 'A' as order_type union all select 'def' as order_id, 'B' as order_type union all select 'ghi' as order_id, 'A' as order_type - """).to_arrow_table() + """ + ).to_arrow_table() res = table.upsert(df=source_df, join_cols=["order_id"]) @@ -1563,5 +1577,3 @@ def test_coarse_match_filter_composite_key_mixed_types() -> None: # numeric_id is sparse (density < 10%), so should use In() # string_id is non-numeric, so should use In() assert isinstance(result, And) - - From 81238264cff5729be9f163984c88216acfd5e9f6 Mon Sep 17 00:00:00 2001 From: EnyMan Date: Thu, 22 Jan 2026 23:29:54 +0100 Subject: [PATCH 6/8] test: Add type assertions for `Reference` in upsert filter tests --- tests/table/test_upsert.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 2211c6e329..8f93912178 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -1193,6 +1193,7 @@ def test_coarse_match_filter_small_dataset_uses_in_filter() -> None: assert num_keys < LARGE_FILTER_THRESHOLD assert isinstance(result, In) + assert isinstance(result.term, Reference) assert result.term.name == "id" assert len(result.literals) == num_keys @@ -1212,6 +1213,7 @@ def test_coarse_match_filter_threshold_boundary_uses_in_filter() -> None: result = create_coarse_match_filter(table, ["id"]) assert isinstance(result, In) + assert isinstance(result.term, Reference) assert result.term.name == "id" assert len(result.literals) == num_keys @@ -1401,10 +1403,12 @@ def test_coarse_match_filter_single_value_dataset() -> None: # PyIceberg may optimize In() with a single value to EqualTo() if isinstance(result, In): + assert isinstance(result.term, Reference) assert result.term.name == "id" assert len(result.literals) == 1 - assert result.literals[0].value == 42 + assert next(iter(result.literals)).value == 42 elif isinstance(result, EqualTo): + assert isinstance(result.term, Reference) assert result.term.name == "id" assert result.literal.value == 42 else: From e605971d72ea8bd27acd01ab012a8e05330a6bd0 Mon Sep 17 00:00:00 2001 From: EnyMan Date: Thu, 22 Jan 2026 23:31:21 +0100 Subject: [PATCH 7/8] refactor: Remove redundant imports in upsert filter tests --- tests/table/test_upsert.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 8f93912178..999badff95 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -905,8 +905,8 @@ def test_coarse_match_filter_composite_key() -> None: """ Test that create_coarse_match_filter produces efficient In() predicates for composite keys. """ + from pyiceberg.expressions import And, Or from pyiceberg.table.upsert_util import create_coarse_match_filter, create_match_filter - from pyiceberg.expressions import Or, And, In # Create a table with composite key that has overlapping values # (1, 'x'), (2, 'y'), (1, 'z') - exact filter should have 3 conditions @@ -1180,7 +1180,6 @@ def test_is_numeric_type(dtype: pa.DataType, expected_numeric: bool) -> None: def test_coarse_match_filter_small_dataset_uses_in_filter() -> None: """Test that small datasets (< 10,000 unique keys) use In() filter.""" from pyiceberg.expressions import In - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create a dataset with 100 unique keys (well below threshold) @@ -1201,7 +1200,6 @@ def test_coarse_match_filter_small_dataset_uses_in_filter() -> None: def test_coarse_match_filter_threshold_boundary_uses_in_filter() -> None: """Test that datasets at threshold - 1 (9,999 unique keys) still use In() filter.""" from pyiceberg.expressions import In - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create a dataset with exactly threshold - 1 unique keys @@ -1221,7 +1219,6 @@ def test_coarse_match_filter_threshold_boundary_uses_in_filter() -> None: def test_coarse_match_filter_above_threshold_uses_optimized_filter() -> None: """Test that datasets >= 10,000 unique keys use optimized filter strategy.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create a dense dataset (consecutive IDs) with exactly threshold unique keys @@ -1243,7 +1240,6 @@ def test_coarse_match_filter_above_threshold_uses_optimized_filter() -> None: def test_coarse_match_filter_large_dataset() -> None: """Test that large datasets (100,000 unique keys) use optimized filter.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create a dense dataset with 100,000 unique keys @@ -1269,7 +1265,6 @@ def test_coarse_match_filter_large_dataset() -> None: def test_coarse_match_filter_dense_ids_use_range_filter() -> None: """Test that dense IDs (density > 10%) use range filter.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create dense IDs: all values from 0 to N-1 (100% density) @@ -1290,7 +1285,6 @@ def test_coarse_match_filter_dense_ids_use_range_filter() -> None: def test_coarse_match_filter_moderately_dense_ids_use_range_filter() -> None: """Test that moderately dense IDs (50% density) use range filter.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create IDs: 0, 2, 4, 6, ... (every other number) - 50% density @@ -1330,7 +1324,6 @@ def test_coarse_match_filter_sparse_ids_use_always_true() -> None: def test_coarse_match_filter_density_boundary_at_10_percent() -> None: """Test exact 10% boundary density behavior.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create IDs at exactly ~10% density @@ -1379,7 +1372,6 @@ def test_coarse_match_filter_very_sparse_ids() -> None: def test_coarse_match_filter_empty_dataset_returns_always_false() -> None: """Test that empty dataset returns AlwaysFalse.""" from pyiceberg.expressions import AlwaysFalse - from pyiceberg.table.upsert_util import create_coarse_match_filter schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) @@ -1393,7 +1385,6 @@ def test_coarse_match_filter_empty_dataset_returns_always_false() -> None: def test_coarse_match_filter_single_value_dataset() -> None: """Test that single value dataset uses In() or EqualTo() with single value.""" from pyiceberg.expressions import In - from pyiceberg.table.upsert_util import create_coarse_match_filter schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.int64())]) @@ -1418,7 +1409,6 @@ def test_coarse_match_filter_single_value_dataset() -> None: def test_coarse_match_filter_negative_numbers_range() -> None: """Test that negative number IDs produce correct min/max range.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create dense negative IDs: -10000 to -1 @@ -1441,7 +1431,6 @@ def test_coarse_match_filter_negative_numbers_range() -> None: def test_coarse_match_filter_mixed_sign_numbers_range() -> None: """Test that mixed sign IDs (-500 to 500) produce correct range spanning zero.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create IDs spanning zero: -5000 to 4999 @@ -1464,7 +1453,6 @@ def test_coarse_match_filter_mixed_sign_numbers_range() -> None: def test_coarse_match_filter_float_range_filter() -> None: """Test that float IDs use range filter correctly.""" from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual - from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter # Create dense float IDs @@ -1508,7 +1496,6 @@ def test_coarse_match_filter_non_numeric_column_skips_range_filter() -> None: def test_coarse_match_filter_composite_key_small_dataset() -> None: """Test that composite key with small dataset uses And(In(), In()).""" - from pyiceberg.expressions import In from pyiceberg.table.upsert_util import create_coarse_match_filter @@ -1531,7 +1518,6 @@ def test_coarse_match_filter_composite_key_small_dataset() -> None: def test_coarse_match_filter_composite_key_large_numeric_column() -> None: """Test composite key where one column has >10k unique numeric values.""" - from pyiceberg.expressions import GreaterThanOrEqual, In, LessThanOrEqual from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter @@ -1557,7 +1543,6 @@ def test_coarse_match_filter_composite_key_large_numeric_column() -> None: def test_coarse_match_filter_composite_key_mixed_types() -> None: """Test composite key with mixed numeric and string columns with large dataset.""" - from pyiceberg.expressions import In from pyiceberg.table.upsert_util import LARGE_FILTER_THRESHOLD, create_coarse_match_filter From 1d6dad1724eb9b62b9541723852e067aa06dfbbf Mon Sep 17 00:00:00 2001 From: EnyMan Date: Thu, 22 Jan 2026 23:46:57 +0100 Subject: [PATCH 8/8] refactor: reformated --- pyiceberg/table/__init__.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1499c8960e..2688bd5dda 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1799,20 +1799,16 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: - ... + def plan_files(self) -> Iterable[ScanTask]: ... @abstractmethod - def to_arrow(self) -> pa.Table: - ... + def to_arrow(self) -> pa.Table: ... @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: - ... + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... @abstractmethod - def to_polars(self) -> pl.DataFrame: - ... + def to_polars(self) -> pl.DataFrame: ... def update(self: S, **overrides: Any) -> S: """Create a copy of this table scan with updated fields.""" @@ -1845,8 +1841,7 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: return self.update(case_sensitive=case_sensitive) @abstractmethod - def count(self) -> int: - ... + def count(self) -> int: ... class ScanTask: