From 7ceba57e09a648b561fbe99316bf642355e6db65 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Mon, 22 Jul 2024 20:01:31 +0000 Subject: [PATCH 01/12] fix --- pyiceberg/expressions/__init__.py | 4 + pyiceberg/io/pyarrow.py | 120 ++++++++++++++++++++++++++++-- pyiceberg/table/__init__.py | 5 +- tests/integration/test_deletes.py | 75 +++++++++++++++++++ tests/io/test_pyarrow.py | 25 ++++++- tests/io/test_pyarrow_visitor.py | 66 +++++++++++++++- 6 files changed, 284 insertions(+), 11 deletions(-) diff --git a/pyiceberg/expressions/__init__.py b/pyiceberg/expressions/__init__.py index 5adf3a8a48..068c822e20 100644 --- a/pyiceberg/expressions/__init__.py +++ b/pyiceberg/expressions/__init__.py @@ -135,6 +135,10 @@ def __repr__(self) -> str: def ref(self) -> BoundReference[L]: return self + def __hash__(self) -> int: + """Return hash value of the Record class.""" + return hash(str(self)) + class UnboundTerm(Term[Any], Unbound[BoundTerm[L]], ABC): """Represents an unbound term.""" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2451bf7df7..6925be56e6 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -73,11 +73,7 @@ from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ResolveError -from pyiceberg.expressions import ( - AlwaysTrue, - BooleanExpression, - BoundTerm, -) +from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNull, BoundReference, BoundTerm, Not, Or from pyiceberg.expressions.literals import Literal from pyiceberg.expressions.visitors import ( BoundBooleanExpressionVisitor, @@ -638,10 +634,124 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p return left_result | right_result +class _CollectIsValidPredicatesFromExpression(BoundBooleanExpressionVisitor[Any]): + def __init__(self) -> None: + # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. + self.is_valid_or_not_bound_terms: set[BoundTerm[Any]] = set() + # The remaining BoundTerms appearing in the boolean expr. + self.null_unmentioned_bound_terms: set[BoundTerm[Any]] = set() + super().__init__() + + def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None: + """Handle the predicate case where either is_null or is_not_null is included.""" + if term in self.null_unmentioned_bound_terms: + self.null_unmentioned_bound_terms.remove(term) + self.is_valid_or_not_bound_terms.add(term) + + def _handle_skipped(self, term: BoundTerm[Any]) -> None: + """Handle the predicate case where neither is_null or is_not_null is included.""" + if term not in self.is_valid_or_not_bound_terms: + self.null_unmentioned_bound_terms.add(term) + + def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: + self._handle_skipped(term) + + def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: + self._handle_skipped(term) + + # todo: do I have to modify this as well + def visit_is_nan(self, term: BoundTerm[Any]) -> None: + self._handle_skipped(term) + + # todo: do I have to modify this as well, might need 2 self.xx sets for mentioned_nan and none-mentioned-nan + def visit_not_nan(self, term: BoundTerm[Any]) -> None: + self._handle_skipped(term) + + def visit_is_null(self, term: BoundTerm[Any]) -> None: + self._handle_explicit_is_null_or_not(term) + + def visit_not_null(self, term: BoundTerm[Any]) -> None: + self._handle_explicit_is_null_or_not(term) + + def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_less_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_less_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_not_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + self._handle_skipped(term) + + def visit_true(self) -> None: + return + + def visit_false(self) -> None: + return + + def visit_not(self, child_result: pc.Expression) -> None: + return + + def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> None: + return + + def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> None: + return + + +def _get_is_valid_or_not_bound_refs(expr: BooleanExpression) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]]]: + """Collect the bound terms catogorized by having at least one is_null or is_not_null in the expr and the remaining.""" + collector = _CollectIsValidPredicatesFromExpression() + boolean_expression_visit(expr, collector) + null_unmentioned_bound_terms = collector.null_unmentioned_bound_terms + is_valid_or_not_bound_terms = collector.is_valid_or_not_bound_terms + + null_unmentioned_bound_refs: Set[BoundReference[Any]] = set() + is_valid_or_not_bound_refs: Set[BoundReference[Any]] = set() + for t in null_unmentioned_bound_terms: + if not isinstance(t, BoundReference): + raise ValueError("Collected Bound Term that is not reference.") + else: + null_unmentioned_bound_refs.add(t) + for t in is_valid_or_not_bound_terms: + if not isinstance(t, BoundReference): + raise ValueError("Collected Bound Term that is not reference.") + else: + is_valid_or_not_bound_refs.add(t) + return null_unmentioned_bound_refs, is_valid_or_not_bound_refs + + def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def expression_to_reverted_pyarrow(expr: BooleanExpression) -> pc.Expression: + """Complimentary filter convertion function of expression_to_pyarrow. + + Could not use expression_to_pyarrow(Not(expr)) to achieve this effect because ~ in pc.Expression does not handle null. + """ + null_unmentioned_bound_terms: set[BoundReference[Any]] = _get_is_valid_or_not_bound_refs(expr)[0] + preserver_expr: BooleanExpression = Not(expr) + + for term in null_unmentioned_bound_terms: + preserver_expr = Or(preserver_expr, BoundIsNull(term=term)) + return expression_to_pyarrow(preserver_expr) + + @lru_cache def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat: if file_format == FileFormat.PARQUET: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0cbe4630e4..c4a70e3098 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -58,7 +58,6 @@ And, BooleanExpression, EqualTo, - Not, Or, Reference, ) @@ -576,7 +575,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti delete_filter: A boolean expression to delete rows from a table snapshot_properties: Custom properties to be added to the snapshot summary """ - from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table + from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_reverted_pyarrow, project_table if ( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) @@ -593,7 +592,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) - preserve_row_filter = expression_to_pyarrow(Not(bound_delete_filter)) + preserve_row_filter = expression_to_reverted_pyarrow(bound_delete_filter) files = self._scan(row_filter=delete_filter).plan_files() diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index d8fb01c447..3a4d00d98f 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -105,6 +105,41 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [30, 30]} +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_rewrite_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = "default.table_partitioned_delete" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + f""" + INSERT INTO {identifier} VALUES (11, 20), (11, NULL) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number", 20)) + + # We don't delete a whole partition, so there is only a overwrite + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "overwrite"] + assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [None, 30]} + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: @@ -417,3 +452,43 @@ def test_delete_truncate(session_catalog: RestCatalog) -> None: assert len(entries) == 1 assert entries[0].status == ManifestEntryStatus.DELETED + + +@pytest.mark.integration +def test_delete_overwrite_with_null(session_catalog: RestCatalog) -> None: + arrow_schema = pa.schema([pa.field("ints", pa.int32())]) + arrow_tbl = pa.Table.from_pylist( + [{"ints": 1}, {"ints": 2}, {"ints": None}], + schema=arrow_schema, + ) + + iceberg_schema = Schema(NestedField(1, "ints", IntegerType())) + + tbl_identifier = "default.test_delete_overwrite_with_null" + + try: + session_catalog.drop_table(tbl_identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(tbl_identifier, iceberg_schema) + tbl.append(arrow_tbl) + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND] + + arrow_tbl_overwrite = pa.Table.from_pylist( + [ + {"ints": 3}, + {"ints": 4}, + ], + schema=arrow_schema, + ) + tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [ + Operation.APPEND, + Operation.OVERWRITE, + Operation.APPEND, + ] + + assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1, None] diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 82b35341b9..de994f181c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name - import os import tempfile import uuid @@ -69,12 +68,13 @@ _to_requested_schema, bin_pack_arrow_table, expression_to_pyarrow, + expression_to_reverted_pyarrow, project_table, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec -from pyiceberg.schema import Schema, make_compatible_name, visit +from pyiceberg.schema import Accessor, Schema, make_compatible_name, visit from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.transforms import IdentityTransform @@ -725,6 +725,27 @@ def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None: assert repr(expression_to_pyarrow(AlwaysFalse())) == "" +def test_revert_expression_to_pyarrow() -> None: + bound_reference_str = BoundReference( + field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) + ) + bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) + + bound_reference_long = BoundReference( + field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) + ) + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + + bound_is_null_long_field = BoundIsNull(bound_reference_long) + + bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_long_field) + result = expression_to_reverted_pyarrow(bound_expr) + assert ( + repr(result) + == """ 100)) or is_null(field_long, {nan_is_null=false}))) or is_null(field_str, {nan_is_null=false}))>""" + ) + + @pytest.fixture def schema_int() -> Schema: return Schema(NestedField(1, "id", IntegerType(), required=False)) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 897af1bbbd..b32f4e7da0 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -20,17 +20,27 @@ import pyarrow as pa import pytest +from pyiceberg.expressions import ( + And, + BoundEqualTo, + BoundGreaterThan, + BoundIsNull, + BoundReference, + Or, +) +from pyiceberg.expressions.literals import LongLiteral, literal from pyiceberg.io.pyarrow import ( _ConvertToArrowSchema, _ConvertToIceberg, _ConvertToIcebergWithoutIDs, + _get_is_valid_or_not_bound_refs, _HasIds, _pyarrow_schema_ensure_large_types, pyarrow_to_schema, schema_to_pyarrow, visit_pyarrow, ) -from pyiceberg.schema import Schema, visit +from pyiceberg.schema import Accessor, Schema, visit from pyiceberg.table.name_mapping import MappedField, NameMapping from pyiceberg.types import ( BinaryType, @@ -580,3 +590,57 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa ), ]) assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema + + +@pytest.fixture +def bound_reference_long() -> BoundReference[int]: + return BoundReference(field=NestedField(1, "field", LongType(), required=False), accessor=Accessor(position=0, inner=None)) + + +def test_collect_null_mentioned_terms() -> None: + bound_reference_str = BoundReference( + field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) + ) + bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) + + bound_reference_long = BoundReference( + field=NestedField(2, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) + ) + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + + bound_reference_bool = BoundReference( + field=NestedField(3, "field_bool", BooleanType(), required=False), accessor=Accessor(position=2, inner=None) + ) + bound_is_null_bool_field = BoundIsNull(bound_reference_bool) + + bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_bool_field) + + categorized_terms = _get_is_valid_or_not_bound_refs(bound_expr) + assert {"field_long", "field_str"} == {f.field.name for f in categorized_terms[0]} + assert { + "field_bool", + } == {f.field.name for f in categorized_terms[1]} + + +def test_collect_null_mentioned_terms_with_multiple_predicates_on_the_same_term() -> None: + """Test a single term appears multiple places in the expression tree""" + bound_reference_str = BoundReference( + field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) + ) + bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) + + bound_reference_long = BoundReference( + field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) + ) + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + + bound_is_null_long_field = BoundIsNull(bound_reference_long) + + bound_expr = Or( + And(Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_long_field), bound_larger_than_long_field), + bound_eq_str_field, + ) + + categorized_terms = _get_is_valid_or_not_bound_refs(bound_expr) + assert {"field_str"} == set({f.field.name for f in categorized_terms[0]}) + assert {"field_long"} == set({f.field.name for f in categorized_terms[1]}) From c7c6bb4ef27bd1182bfdaa09487bc2c9d67d0012 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Mon, 22 Jul 2024 21:07:52 +0000 Subject: [PATCH 02/12] naming --- pyiceberg/io/pyarrow.py | 4 ++-- tests/integration/test_deletes.py | 4 ++-- tests/io/test_pyarrow.py | 4 ++-- tests/io/test_pyarrow_visitor.py | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6925be56e6..4cf48e2e2d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -634,7 +634,7 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p return left_result | right_result -class _CollectIsValidPredicatesFromExpression(BoundBooleanExpressionVisitor[Any]): +class _CollectNullUnmentionedTermsFromExpression(BoundBooleanExpressionVisitor[Any]): def __init__(self) -> None: # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. self.is_valid_or_not_bound_terms: set[BoundTerm[Any]] = set() @@ -715,7 +715,7 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> N def _get_is_valid_or_not_bound_refs(expr: BooleanExpression) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]]]: """Collect the bound terms catogorized by having at least one is_null or is_not_null in the expr and the remaining.""" - collector = _CollectIsValidPredicatesFromExpression() + collector = _CollectNullUnmentionedTermsFromExpression() boolean_expression_visit(expr, collector) null_unmentioned_bound_terms = collector.null_unmentioned_bound_terms is_valid_or_not_bound_terms = collector.is_valid_or_not_bound_terms diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 3a4d00d98f..309c0c6a4d 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -107,7 +107,7 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_partitioned_table_rewrite_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: +def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: identifier = "default.table_partitioned_delete" run_spark_commands( @@ -455,7 +455,7 @@ def test_delete_truncate(session_catalog: RestCatalog) -> None: @pytest.mark.integration -def test_delete_overwrite_with_null(session_catalog: RestCatalog) -> None: +def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None: arrow_schema = pa.schema([pa.field("ints", pa.int32())]) arrow_tbl = pa.Table.from_pylist( [{"ints": 1}, {"ints": 2}, {"ints": None}], diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index de994f181c..4145577f6a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -725,7 +725,7 @@ def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None: assert repr(expression_to_pyarrow(AlwaysFalse())) == "" -def test_revert_expression_to_pyarrow() -> None: +def test_expression_to_reverted_pyarrow() -> None: bound_reference_str = BoundReference( field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) ) @@ -734,7 +734,7 @@ def test_revert_expression_to_pyarrow() -> None: bound_reference_long = BoundReference( field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore bound_is_null_long_field = BoundIsNull(bound_reference_long) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index b32f4e7da0..11aef2f61f 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -28,7 +28,7 @@ BoundReference, Or, ) -from pyiceberg.expressions.literals import LongLiteral, literal +from pyiceberg.expressions.literals import literal from pyiceberg.io.pyarrow import ( _ConvertToArrowSchema, _ConvertToIceberg, @@ -597,7 +597,7 @@ def bound_reference_long() -> BoundReference[int]: return BoundReference(field=NestedField(1, "field", LongType(), required=False), accessor=Accessor(position=0, inner=None)) -def test_collect_null_mentioned_terms() -> None: +def test_collect_null_unmentioned_terms() -> None: bound_reference_str = BoundReference( field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) ) @@ -606,7 +606,7 @@ def test_collect_null_mentioned_terms() -> None: bound_reference_long = BoundReference( field=NestedField(2, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore bound_reference_bool = BoundReference( field=NestedField(3, "field_bool", BooleanType(), required=False), accessor=Accessor(position=2, inner=None) @@ -622,7 +622,7 @@ def test_collect_null_mentioned_terms() -> None: } == {f.field.name for f in categorized_terms[1]} -def test_collect_null_mentioned_terms_with_multiple_predicates_on_the_same_term() -> None: +def test_collect_null_unmentioned_terms_with_multiple_predicates_on_the_same_term() -> None: """Test a single term appears multiple places in the expression tree""" bound_reference_str = BoundReference( field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) @@ -632,7 +632,7 @@ def test_collect_null_mentioned_terms_with_multiple_predicates_on_the_same_term( bound_reference_long = BoundReference( field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore + bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore bound_is_null_long_field = BoundIsNull(bound_reference_long) From fbbdbee0a2d999e9e12d0f5a2be0fbc2ffb6c9b6 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 05:17:37 +0000 Subject: [PATCH 03/12] handle nan as well --- pyiceberg/expressions/__init__.py | 2 +- pyiceberg/io/pyarrow.py | 120 +++++++++++++++++++----------- pyiceberg/table/__init__.py | 11 ++- tests/integration/test_deletes.py | 71 +++++++++++++++++- tests/io/test_pyarrow_visitor.py | 6 +- 5 files changed, 162 insertions(+), 48 deletions(-) diff --git a/pyiceberg/expressions/__init__.py b/pyiceberg/expressions/__init__.py index 068c822e20..830637aa99 100644 --- a/pyiceberg/expressions/__init__.py +++ b/pyiceberg/expressions/__init__.py @@ -136,7 +136,7 @@ def ref(self) -> BoundReference[L]: return self def __hash__(self) -> int: - """Return hash value of the Record class.""" + """Return hash value of the BoundReference class.""" return hash(str(self)) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4cf48e2e2d..25ca1a56ed 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -73,7 +73,7 @@ from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ResolveError -from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNull, BoundReference, BoundTerm, Not, Or +from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundReference, BoundTerm, Not, Or from pyiceberg.expressions.literals import Literal from pyiceberg.expressions.visitors import ( BoundBooleanExpressionVisitor, @@ -634,68 +634,97 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p return left_result | right_result -class _CollectNullUnmentionedTermsFromExpression(BoundBooleanExpressionVisitor[Any]): +class _CollectNullNaNUnmentionedTermsFromExpression(BoundBooleanExpressionVisitor[Any]): def __init__(self) -> None: # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. - self.is_valid_or_not_bound_terms: set[BoundTerm[Any]] = set() + self.is_null_or_not_bound_terms: set[BoundTerm[Any]] = set() # The remaining BoundTerms appearing in the boolean expr. self.null_unmentioned_bound_terms: set[BoundTerm[Any]] = set() + + # BoundTerms which have either is_nan or is_not_nan appearing at least once in the boolean expr. + self.is_nan_or_not_bound_terms: set[BoundTerm[Any]] = set() + # The remaining BoundTerms appearing in the boolean expr. + self.nan_unmentioned_bound_terms: set[BoundTerm[Any]] = set() + super().__init__() def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None: """Handle the predicate case where either is_null or is_not_null is included.""" if term in self.null_unmentioned_bound_terms: self.null_unmentioned_bound_terms.remove(term) - self.is_valid_or_not_bound_terms.add(term) + self.is_null_or_not_bound_terms.add(term) - def _handle_skipped(self, term: BoundTerm[Any]) -> None: + def _handle_null_unmentioned(self, term: BoundTerm[Any]) -> None: """Handle the predicate case where neither is_null or is_not_null is included.""" - if term not in self.is_valid_or_not_bound_terms: + if term not in self.is_null_or_not_bound_terms: self.null_unmentioned_bound_terms.add(term) + def _handle_explicit_is_nan_or_not(self, term: BoundTerm[Any]) -> None: + """Handle the predicate case where either is_nan or is_not_nan is included.""" + if term in self.nan_unmentioned_bound_terms: + self.nan_unmentioned_bound_terms.remove(term) + self.is_nan_or_not_bound_terms.add(term) + + def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None: + """Handle the predicate case where neither is_nan or is_not_nan is included.""" + if term not in self.is_nan_or_not_bound_terms: + self.nan_unmentioned_bound_terms.add(term) + def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) - # todo: do I have to modify this as well def visit_is_nan(self, term: BoundTerm[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_explicit_is_nan_or_not(term) - # todo: do I have to modify this as well, might need 2 self.xx sets for mentioned_nan and none-mentioned-nan def visit_not_nan(self, term: BoundTerm[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_explicit_is_nan_or_not(term) def visit_is_null(self, term: BoundTerm[Any]) -> None: self._handle_explicit_is_null_or_not(term) + self._handle_nan_unmentioned(term) def visit_not_null(self, term: BoundTerm[Any]) -> None: self._handle_explicit_is_null_or_not(term) + self._handle_nan_unmentioned(term) def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_less_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_less_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_not_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - self._handle_skipped(term) + self._handle_null_unmentioned(term) + self._handle_nan_unmentioned(term) def visit_true(self) -> None: return @@ -713,26 +742,29 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> N return -def _get_is_valid_or_not_bound_refs(expr: BooleanExpression) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]]]: - """Collect the bound terms catogorized by having at least one is_null or is_not_null in the expr and the remaining.""" - collector = _CollectNullUnmentionedTermsFromExpression() +def _get_null_nan_refs( + expr: BooleanExpression, +) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]]]: + """Collect the bound terms categorized by having at least one is_null or is_not_null in the expr and the remaining.""" + collector = _CollectNullNaNUnmentionedTermsFromExpression() boolean_expression_visit(expr, collector) - null_unmentioned_bound_terms = collector.null_unmentioned_bound_terms - is_valid_or_not_bound_terms = collector.is_valid_or_not_bound_terms - - null_unmentioned_bound_refs: Set[BoundReference[Any]] = set() - is_valid_or_not_bound_refs: Set[BoundReference[Any]] = set() - for t in null_unmentioned_bound_terms: - if not isinstance(t, BoundReference): - raise ValueError("Collected Bound Term that is not reference.") - else: - null_unmentioned_bound_refs.add(t) - for t in is_valid_or_not_bound_terms: - if not isinstance(t, BoundReference): - raise ValueError("Collected Bound Term that is not reference.") - else: - is_valid_or_not_bound_refs.add(t) - return null_unmentioned_bound_refs, is_valid_or_not_bound_refs + + def _downcast_term_to_reference(bound_terms: Set[BoundTerm[Any]]) -> Set[BoundReference[Any]]: + """Handle mypy check for BoundTerm -> BoundReference.""" + bound_refs: Set[BoundReference[Any]] = set() + for t in bound_terms: + if not isinstance(t, BoundReference): + raise ValueError("Collected Bound Term that is not reference.") + else: + bound_refs.add(t) + return bound_refs + + null_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.null_unmentioned_bound_terms) + is_null_or_not_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.is_null_or_not_bound_terms) + nan_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.nan_unmentioned_bound_terms) + is_nan_or_not_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.is_nan_or_not_bound_terms) + + return null_unmentioned_bound_refs, nan_unmentioned_bound_refs, is_null_or_not_bound_refs, is_nan_or_not_bound_refs def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: @@ -740,15 +772,19 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: def expression_to_reverted_pyarrow(expr: BooleanExpression) -> pc.Expression: - """Complimentary filter convertion function of expression_to_pyarrow. + """Complimentary filter conversion function of expression_to_pyarrow. Could not use expression_to_pyarrow(Not(expr)) to achieve this effect because ~ in pc.Expression does not handle null. """ - null_unmentioned_bound_terms: set[BoundReference[Any]] = _get_is_valid_or_not_bound_refs(expr)[0] + null_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[0] + nan_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[1] preserver_expr: BooleanExpression = Not(expr) - for term in null_unmentioned_bound_terms: + for term in null_unmentioned_bound_refs: preserver_expr = Or(preserver_expr, BoundIsNull(term=term)) + for term in nan_unmentioned_bound_refs: + preserver_expr = Or(preserver_expr, BoundIsNaN(term=term)) + return expression_to_pyarrow(preserver_expr) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c4a70e3098..87e29cc42f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -58,6 +58,7 @@ And, BooleanExpression, EqualTo, + Not, Or, Reference, ) @@ -575,7 +576,12 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti delete_filter: A boolean expression to delete rows from a table snapshot_properties: Custom properties to be added to the snapshot summary """ - from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_reverted_pyarrow, project_table + from pyiceberg.io.pyarrow import ( + _dataframe_to_data_files, + expression_to_pyarrow, + expression_to_reverted_pyarrow, + project_table, + ) if ( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) @@ -592,7 +598,10 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) + print(f"{bound_delete_filter=}") + print(f"{expression_to_pyarrow(Not(bound_delete_filter))=}") preserve_row_filter = expression_to_reverted_pyarrow(bound_delete_filter) + print(f"{preserve_row_filter=}") files = self._scan(row_filter=delete_filter).plan_files() diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 309c0c6a4d..80f2e9260e 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -27,7 +27,7 @@ from pyiceberg.manifest import ManifestEntryStatus from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation, Summary -from pyiceberg.types import IntegerType, NestedField +from pyiceberg.types import FloatType, IntegerType, NestedField def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: @@ -492,3 +492,72 @@ def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None: ] assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1, None] + + +@pytest.mark.canada +def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: + arrow_schema = pa.schema([pa.field("floats", pa.float32())]) + + # Create Arrow Table with NaN values + data = [pa.array([1.0, float("nan"), 2.0], type=pa.float32())] + arrow_tbl = pa.Table.from_arrays( + data, + schema=arrow_schema, + ) + + iceberg_schema = Schema(NestedField(1, "floats", FloatType())) + + tbl_identifier = "default.test_delete_overwrite_with_nan" + + try: + session_catalog.drop_table(tbl_identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(tbl_identifier, iceberg_schema) + tbl.append(arrow_tbl) + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND] + + arrow_tbl_overwrite = pa.Table.from_pylist( + [ + {"floats": 3.0}, + {"floats": 4.0}, + ], + schema=arrow_schema, + ) + """ + We want to test the expression_to_reverted_pyarrow function can generate a correct complimentary filter + for selecting records to remain in the new overwritten file. + Compared with test_delete_overwrite_table_with_null which tests rows with null cells, + nan testing is faced with a more tricky issue: + A filter of (field == value) will not include cells of nan but col != val will. + (Interestingly, neither == or != will include null) + + This means if we set the test case as floats == 2.0 (equal predicate as in test_delete_overwrite_table_with_null), + test will pass even without the logic under test + in _CollectNullNaNUnmentionedTermsFromExpression (a helper of expression_to_reverted_pyarrow + to handle revert of iceberg expression of is_null/not_null/is_nan/not_nan). + Instead, we test the filter of !=, so that the revert is == which exposes the issue. + """ + tbl.overwrite(arrow_tbl_overwrite, "floats != 2.0") # Should rewrite one file + + assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [ + Operation.APPEND, + Operation.OVERWRITE, + Operation.APPEND, + ] + + result = tbl.scan().to_arrow()["floats"].to_pylist() + + from math import isnan + + assert any(isnan(e) for e in result) + assert 2.0 in result + assert 3.0 in result + assert 4.0 in result + + +@pytest.mark.german +def test_nan() -> None: + print("what is sue", float("nan") == float("nan")) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 11aef2f61f..27d63ca517 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -33,7 +33,7 @@ _ConvertToArrowSchema, _ConvertToIceberg, _ConvertToIcebergWithoutIDs, - _get_is_valid_or_not_bound_refs, + _get_null_nan_refs, _HasIds, _pyarrow_schema_ensure_large_types, pyarrow_to_schema, @@ -615,7 +615,7 @@ def test_collect_null_unmentioned_terms() -> None: bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_bool_field) - categorized_terms = _get_is_valid_or_not_bound_refs(bound_expr) + categorized_terms = _get_null_nan_refs(bound_expr) assert {"field_long", "field_str"} == {f.field.name for f in categorized_terms[0]} assert { "field_bool", @@ -641,6 +641,6 @@ def test_collect_null_unmentioned_terms_with_multiple_predicates_on_the_same_ter bound_eq_str_field, ) - categorized_terms = _get_is_valid_or_not_bound_refs(bound_expr) + categorized_terms = _get_null_nan_refs(bound_expr) assert {"field_str"} == set({f.field.name for f in categorized_terms[0]}) assert {"field_long"} == set({f.field.name for f in categorized_terms[1]}) From 81b0c077a143dfe968fa122bf8a4853b61310923 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 05:20:30 +0000 Subject: [PATCH 04/12] naming as sung suggested --- pyiceberg/io/pyarrow.py | 4 ++-- pyiceberg/table/__init__.py | 4 ++-- tests/integration/test_deletes.py | 4 ++-- tests/io/test_pyarrow.py | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 25ca1a56ed..0601ca8fa1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -771,10 +771,10 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) -def expression_to_reverted_pyarrow(expr: BooleanExpression) -> pc.Expression: +def _expression_to_complimentary_pyarrow(expr: BooleanExpression) -> pc.Expression: """Complimentary filter conversion function of expression_to_pyarrow. - Could not use expression_to_pyarrow(Not(expr)) to achieve this effect because ~ in pc.Expression does not handle null. + Could not use expression_to_pyarrow(Not(expr)) to achieve this complimentary effect because ~ in pc.Expression does not handle null. """ null_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[0] nan_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[1] diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 87e29cc42f..8ae6f5ed77 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -578,8 +578,8 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti """ from pyiceberg.io.pyarrow import ( _dataframe_to_data_files, + _expression_to_complimentary_pyarrow, expression_to_pyarrow, - expression_to_reverted_pyarrow, project_table, ) @@ -600,7 +600,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) print(f"{bound_delete_filter=}") print(f"{expression_to_pyarrow(Not(bound_delete_filter))=}") - preserve_row_filter = expression_to_reverted_pyarrow(bound_delete_filter) + preserve_row_filter = _expression_to_complimentary_pyarrow(bound_delete_filter) print(f"{preserve_row_filter=}") files = self._scan(row_filter=delete_filter).plan_files() diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 80f2e9260e..c0046c5da0 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -527,7 +527,7 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: schema=arrow_schema, ) """ - We want to test the expression_to_reverted_pyarrow function can generate a correct complimentary filter + We want to test the _expression_to_complimentary_pyarrow function can generate a correct complimentary filter for selecting records to remain in the new overwritten file. Compared with test_delete_overwrite_table_with_null which tests rows with null cells, nan testing is faced with a more tricky issue: @@ -536,7 +536,7 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: This means if we set the test case as floats == 2.0 (equal predicate as in test_delete_overwrite_table_with_null), test will pass even without the logic under test - in _CollectNullNaNUnmentionedTermsFromExpression (a helper of expression_to_reverted_pyarrow + in _CollectNullNaNUnmentionedTermsFromExpression (a helper of _expression_to_complimentary_pyarrow to handle revert of iceberg expression of is_null/not_null/is_nan/not_nan). Instead, we test the filter of !=, so that the revert is == which exposes the issue. """ diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4145577f6a..c6af8108ae 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -63,12 +63,12 @@ _check_pyarrow_schema_compatible, _ConvertToArrowSchema, _determine_partitions, + _expression_to_complimentary_pyarrow, _primitive_to_physical, _read_deletes, _to_requested_schema, bin_pack_arrow_table, expression_to_pyarrow, - expression_to_reverted_pyarrow, project_table, schema_to_pyarrow, ) @@ -725,7 +725,7 @@ def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None: assert repr(expression_to_pyarrow(AlwaysFalse())) == "" -def test_expression_to_reverted_pyarrow() -> None: +def test__expression_to_complimentary_pyarrow() -> None: bound_reference_str = BoundReference( field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) ) @@ -739,7 +739,7 @@ def test_expression_to_reverted_pyarrow() -> None: bound_is_null_long_field = BoundIsNull(bound_reference_long) bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_long_field) - result = expression_to_reverted_pyarrow(bound_expr) + result = _expression_to_complimentary_pyarrow(bound_expr) assert ( repr(result) == """ 100)) or is_null(field_long, {nan_is_null=false}))) or is_null(field_str, {nan_is_null=false}))>""" From 1e062e2886ebf643251ab6d3c46a788d3c41afe6 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:37:18 +0000 Subject: [PATCH 05/12] one more test to fix; one more comment to address --- pyiceberg/io/pyarrow.py | 51 ++++++----- pyiceberg/table/__init__.py | 4 +- tests/integration/test_deletes.py | 11 +-- tests/io/test_pyarrow.py | 24 +----- tests/io/test_pyarrow_visitor.py | 135 ++++++++++++++++++++++-------- 5 files changed, 137 insertions(+), 88 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0601ca8fa1..053781b2d7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -635,17 +635,20 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p class _CollectNullNaNUnmentionedTermsFromExpression(BoundBooleanExpressionVisitor[Any]): - def __init__(self) -> None: - # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. - self.is_null_or_not_bound_terms: set[BoundTerm[Any]] = set() - # The remaining BoundTerms appearing in the boolean expr. - self.null_unmentioned_bound_terms: set[BoundTerm[Any]] = set() - - # BoundTerms which have either is_nan or is_not_nan appearing at least once in the boolean expr. - self.is_nan_or_not_bound_terms: set[BoundTerm[Any]] = set() - # The remaining BoundTerms appearing in the boolean expr. - self.nan_unmentioned_bound_terms: set[BoundTerm[Any]] = set() + # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. + is_null_or_not_bound_terms: set[BoundTerm[Any]] + # The remaining BoundTerms appearing in the boolean expr. + null_unmentioned_bound_terms: set[BoundTerm[Any]] + # BoundTerms which have either is_nan or is_not_nan appearing at least once in the boolean expr. + is_nan_or_not_bound_terms: set[BoundTerm[Any]] + # The remaining BoundTerms appearing in the boolean expr. + nan_unmentioned_bound_terms: set[BoundTerm[Any]] + def __init__(self) -> None: + self.is_null_or_not_bound_terms = set() + self.null_unmentioned_bound_terms = set() + self.is_nan_or_not_bound_terms = set() + self.nan_unmentioned_bound_terms = set() super().__init__() def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None: @@ -671,6 +674,7 @@ def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None: self.nan_unmentioned_bound_terms.add(term) def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: + print("testing isnan trace: in predicate visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -679,22 +683,27 @@ def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> No self._handle_nan_unmentioned(term) def visit_is_nan(self, term: BoundTerm[Any]) -> None: + print("testing isnan trace: is nan visit called.") self._handle_null_unmentioned(term) self._handle_explicit_is_nan_or_not(term) def visit_not_nan(self, term: BoundTerm[Any]) -> None: + print("testing isnan trace: not nan visit called.") self._handle_null_unmentioned(term) self._handle_explicit_is_nan_or_not(term) def visit_is_null(self, term: BoundTerm[Any]) -> None: + print("testing isnan trace: is null visit called.") self._handle_explicit_is_null_or_not(term) self._handle_nan_unmentioned(term) def visit_not_null(self, term: BoundTerm[Any]) -> None: + print("testing isnan trace: not null visit called.") self._handle_explicit_is_null_or_not(term) self._handle_nan_unmentioned(term) def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + print("testing isnan trace: equal visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -703,10 +712,12 @@ def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: self._handle_nan_unmentioned(term) def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + print("testing isnan trace: >= visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: + print("testing isnan trace: > equal visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -771,21 +782,21 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) -def _expression_to_complimentary_pyarrow(expr: BooleanExpression) -> pc.Expression: - """Complimentary filter conversion function of expression_to_pyarrow. +def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expression: + """Complementary filter conversion function of expression_to_pyarrow. Could not use expression_to_pyarrow(Not(expr)) to achieve this complimentary effect because ~ in pc.Expression does not handle null. """ - null_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[0] - nan_unmentioned_bound_refs: set[BoundReference[Any]] = _get_null_nan_refs(expr)[1] - preserver_expr: BooleanExpression = Not(expr) - + categorized_refs = _get_null_nan_refs(expr) + null_unmentioned_bound_refs: set[BoundReference[Any]] = categorized_refs[0] + nan_unmentioned_bound_refs: set[BoundReference[Any]] = categorized_refs[1] + preserve_expr: BooleanExpression = Not(expr) + print("check the order:", [f.field.name for f in null_unmentioned_bound_refs]) for term in null_unmentioned_bound_refs: - preserver_expr = Or(preserver_expr, BoundIsNull(term=term)) + preserve_expr = Or(preserve_expr, BoundIsNull(term=term)) for term in nan_unmentioned_bound_refs: - preserver_expr = Or(preserver_expr, BoundIsNaN(term=term)) - - return expression_to_pyarrow(preserver_expr) + preserve_expr = Or(preserve_expr, BoundIsNaN(term=term)) + return expression_to_pyarrow(preserve_expr) @lru_cache diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8ae6f5ed77..cfc8bb322b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -578,7 +578,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti """ from pyiceberg.io.pyarrow import ( _dataframe_to_data_files, - _expression_to_complimentary_pyarrow, + _expression_to_complementary_pyarrow, expression_to_pyarrow, project_table, ) @@ -600,7 +600,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) print(f"{bound_delete_filter=}") print(f"{expression_to_pyarrow(Not(bound_delete_filter))=}") - preserve_row_filter = _expression_to_complimentary_pyarrow(bound_delete_filter) + preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) print(f"{preserve_row_filter=}") files = self._scan(row_filter=delete_filter).plan_files() diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index c0046c5da0..562b790a3c 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -494,7 +494,7 @@ def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None: assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1, None] -@pytest.mark.canada +@pytest.mark.integration def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: arrow_schema = pa.schema([pa.field("floats", pa.float32())]) @@ -527,7 +527,7 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: schema=arrow_schema, ) """ - We want to test the _expression_to_complimentary_pyarrow function can generate a correct complimentary filter + We want to test the _expression_to_complementary_pyarrow function can generate a correct complimentary filter for selecting records to remain in the new overwritten file. Compared with test_delete_overwrite_table_with_null which tests rows with null cells, nan testing is faced with a more tricky issue: @@ -536,7 +536,7 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: This means if we set the test case as floats == 2.0 (equal predicate as in test_delete_overwrite_table_with_null), test will pass even without the logic under test - in _CollectNullNaNUnmentionedTermsFromExpression (a helper of _expression_to_complimentary_pyarrow + in _CollectNullNaNUnmentionedTermsFromExpression (a helper of _expression_to_complementary_pyarrow to handle revert of iceberg expression of is_null/not_null/is_nan/not_nan). Instead, we test the filter of !=, so that the revert is == which exposes the issue. """ @@ -556,8 +556,3 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: assert 2.0 in result assert 3.0 in result assert 4.0 in result - - -@pytest.mark.german -def test_nan() -> None: - print("what is sue", float("nan") == float("nan")) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index c6af8108ae..60c6e4b67a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -63,7 +63,6 @@ _check_pyarrow_schema_compatible, _ConvertToArrowSchema, _determine_partitions, - _expression_to_complimentary_pyarrow, _primitive_to_physical, _read_deletes, _to_requested_schema, @@ -74,7 +73,7 @@ ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec -from pyiceberg.schema import Accessor, Schema, make_compatible_name, visit +from pyiceberg.schema import Schema, make_compatible_name, visit from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.transforms import IdentityTransform @@ -725,27 +724,6 @@ def test_always_false_to_pyarrow(bound_reference: BoundReference[str]) -> None: assert repr(expression_to_pyarrow(AlwaysFalse())) == "" -def test__expression_to_complimentary_pyarrow() -> None: - bound_reference_str = BoundReference( - field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) - ) - bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) - - bound_reference_long = BoundReference( - field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) - ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore - - bound_is_null_long_field = BoundIsNull(bound_reference_long) - - bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_long_field) - result = _expression_to_complimentary_pyarrow(bound_expr) - assert ( - repr(result) - == """ 100)) or is_null(field_long, {nan_is_null=false}))) or is_null(field_str, {nan_is_null=false}))>""" - ) - - @pytest.fixture def schema_int() -> Schema: return Schema(NestedField(1, "id", IntegerType(), required=False)) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 27d63ca517..87e630a12a 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -16,6 +16,7 @@ # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name import re +from typing import Any import pyarrow as pa import pytest @@ -24,8 +25,10 @@ And, BoundEqualTo, BoundGreaterThan, + BoundIsNaN, BoundIsNull, BoundReference, + Not, Or, ) from pyiceberg.expressions.literals import literal @@ -33,6 +36,7 @@ _ConvertToArrowSchema, _ConvertToIceberg, _ConvertToIcebergWithoutIDs, + _expression_to_complementary_pyarrow, _get_null_nan_refs, _HasIds, _pyarrow_schema_ensure_large_types, @@ -593,54 +597,115 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa @pytest.fixture -def bound_reference_long() -> BoundReference[int]: - return BoundReference(field=NestedField(1, "field", LongType(), required=False), accessor=Accessor(position=0, inner=None)) +def bound_reference_str() -> BoundReference[Any]: + return BoundReference( + field=NestedField(1, "field_str_unmentioned", StringType(), required=False), accessor=Accessor(position=0, inner=None) + ) -def test_collect_null_unmentioned_terms() -> None: - bound_reference_str = BoundReference( - field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) +@pytest.fixture +def bound_reference_float() -> BoundReference[Any]: + return BoundReference( + field=NestedField(2, "field_float_mentioned_nan", FloatType(), required=False), accessor=Accessor(position=1, inner=None) ) - bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) - bound_reference_long = BoundReference( - field=NestedField(2, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) - ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore - bound_reference_bool = BoundReference( - field=NestedField(3, "field_bool", BooleanType(), required=False), accessor=Accessor(position=2, inner=None) +@pytest.fixture +def bound_reference_double() -> BoundReference[Any]: + return BoundReference( + field=NestedField(3, "field_double_mentioned_null", DoubleType(), required=False), + accessor=Accessor(position=2, inner=None), ) - bound_is_null_bool_field = BoundIsNull(bound_reference_bool) - bound_expr = Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_bool_field) - categorized_terms = _get_null_nan_refs(bound_expr) - assert {"field_long", "field_str"} == {f.field.name for f in categorized_terms[0]} - assert { - "field_bool", - } == {f.field.name for f in categorized_terms[1]} +@pytest.fixture +def bound_eq_str_field(bound_reference_str: BoundReference[Any]) -> BoundEqualTo[Any]: + return BoundEqualTo(term=bound_reference_str, literal=literal("hello")) -def test_collect_null_unmentioned_terms_with_multiple_predicates_on_the_same_term() -> None: - """Test a single term appears multiple places in the expression tree""" - bound_reference_str = BoundReference( - field=NestedField(1, "field_str", StringType(), required=False), accessor=Accessor(position=0, inner=None) - ) - bound_eq_str_field = BoundEqualTo(term=bound_reference_str, literal=literal("hello")) +@pytest.fixture +def bound_greater_than_float_field(bound_reference_float: BoundReference[Any]) -> BoundGreaterThan[Any]: + return BoundGreaterThan(term=bound_reference_float, literal=literal(100)) - bound_reference_long = BoundReference( - field=NestedField(1, "field_long", LongType(), required=False), accessor=Accessor(position=1, inner=None) - ) - bound_larger_than_long_field = BoundGreaterThan(term=bound_reference_long, literal=literal(100)) # type: ignore - bound_is_null_long_field = BoundIsNull(bound_reference_long) +@pytest.fixture +def bound_is_nan_float_field(bound_reference_float: BoundReference[Any]) -> BoundIsNaN[Any]: + return BoundIsNaN(bound_reference_float) + + +@pytest.fixture +def bound_eq_double_field(bound_reference_double: BoundReference[Any]) -> BoundEqualTo[Any]: + return BoundEqualTo(term=bound_reference_double, literal=literal(False)) + - bound_expr = Or( - And(Or(And(bound_eq_str_field, bound_larger_than_long_field), bound_is_null_long_field), bound_larger_than_long_field), - bound_eq_str_field, +@pytest.fixture +def bound_is_null_double_field(bound_reference_double: BoundReference[Any]) -> BoundIsNull[Any]: + return BoundIsNull(bound_reference_double) + + +@pytest.mark.german +def test_collect_null_nan_unmentioned_terms( + bound_eq_str_field: BoundEqualTo[Any], bound_is_nan_float_field: BoundIsNaN[Any], bound_is_null_double_field: BoundIsNull[Any] +) -> None: + bound_expr = And( + Or(And(bound_eq_str_field, bound_is_nan_float_field), bound_is_null_double_field), Not(bound_is_nan_float_field) ) + categorized_terms = _get_null_nan_refs(bound_expr) + assert {f.field.name for f in categorized_terms[0]} == {"field_float_mentioned_nan", "field_str_unmentioned"} + assert {f.field.name for f in categorized_terms[1]} == {"field_str_unmentioned", "field_double_mentioned_null"} + assert {f.field.name for f in categorized_terms[2]} == { + "field_double_mentioned_null", + } + assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} + + +@pytest.mark.german +def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term( + bound_eq_str_field: BoundEqualTo[Any], + bound_greater_than_float_field: BoundGreaterThan[Any], + bound_is_nan_float_field: BoundIsNaN[Any], + bound_eq_double_field: BoundEqualTo[Any], + bound_is_null_double_field: BoundIsNull[Any], +) -> None: + """Test a single term appears multiple places in the expression tree""" + bound_expr = And( + Or( + And(bound_eq_str_field, bound_greater_than_float_field), + And(bound_is_nan_float_field, bound_eq_double_field), + bound_greater_than_float_field, + ), + Not(bound_is_null_double_field), + ) categorized_terms = _get_null_nan_refs(bound_expr) - assert {"field_str"} == set({f.field.name for f in categorized_terms[0]}) - assert {"field_long"} == set({f.field.name for f in categorized_terms[1]}) + assert {f.field.name for f in categorized_terms[0]} == {"field_float_mentioned_nan", "field_str_unmentioned"} + assert {f.field.name for f in categorized_terms[1]} == {"field_str_unmentioned", "field_double_mentioned_null"} + assert {f.field.name for f in categorized_terms[2]} == { + "field_double_mentioned_null", + } + assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} + + +@pytest.mark.china +def test__expression_to_complementary_pyarrow( + bound_eq_str_field: BoundEqualTo[Any], + bound_greater_than_float_field: BoundGreaterThan[Any], + bound_is_nan_float_field: BoundIsNaN[Any], + bound_eq_double_field: BoundEqualTo[Any], + bound_is_null_double_field: BoundIsNull[Any], +) -> None: + bound_expr = And( + Or( + And(bound_eq_str_field, bound_greater_than_float_field), + And(bound_is_nan_float_field, bound_eq_double_field), + bound_greater_than_float_field, + ), + Not(bound_is_null_double_field), + ) + result = _expression_to_complementary_pyarrow(bound_expr) + # Notice an isNan predicate on a str column is automatically converted to always false and removed from Or. + print("this is the result", repr(result)) + assert ( + repr(result) + == """ 100)) or (is_nan(field_float_mentioned_nan) and (field_double_mentioned_null == 0))) or (field_float_mentioned_nan > 100)) and invert(is_null(field_double_mentioned_null, {nan_is_null=false})))) or is_null(field_float_mentioned_nan, {nan_is_null=false})) or is_null(field_str_unmentioned, {nan_is_null=false})) or is_nan(field_double_mentioned_null))>""" + ) From a7282d5607c3f875cede934e42a4ec8799f58540 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 19:30:14 +0000 Subject: [PATCH 06/12] fix a test --- pyiceberg/io/pyarrow.py | 20 +++++++------------- pyiceberg/table/__init__.py | 5 ----- tests/io/test_pyarrow_visitor.py | 6 +----- 3 files changed, 8 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 053781b2d7..1685b384dc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -674,7 +674,6 @@ def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None: self.nan_unmentioned_bound_terms.add(term) def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: - print("testing isnan trace: in predicate visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -683,27 +682,22 @@ def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> No self._handle_nan_unmentioned(term) def visit_is_nan(self, term: BoundTerm[Any]) -> None: - print("testing isnan trace: is nan visit called.") self._handle_null_unmentioned(term) self._handle_explicit_is_nan_or_not(term) def visit_not_nan(self, term: BoundTerm[Any]) -> None: - print("testing isnan trace: not nan visit called.") self._handle_null_unmentioned(term) self._handle_explicit_is_nan_or_not(term) def visit_is_null(self, term: BoundTerm[Any]) -> None: - print("testing isnan trace: is null visit called.") self._handle_explicit_is_null_or_not(term) self._handle_nan_unmentioned(term) def visit_not_null(self, term: BoundTerm[Any]) -> None: - print("testing isnan trace: not null visit called.") self._handle_explicit_is_null_or_not(term) self._handle_nan_unmentioned(term) def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - print("testing isnan trace: equal visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -712,12 +706,10 @@ def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: self._handle_nan_unmentioned(term) def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - print("testing isnan trace: >= visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None: - print("testing isnan trace: > equal visit called.") self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) @@ -756,7 +748,7 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> N def _get_null_nan_refs( expr: BooleanExpression, ) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]]]: - """Collect the bound terms categorized by having at least one is_null or is_not_null in the expr and the remaining.""" + """Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining.""" collector = _CollectNullNaNUnmentionedTermsFromExpression() boolean_expression_visit(expr, collector) @@ -785,13 +777,15 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expression: """Complementary filter conversion function of expression_to_pyarrow. - Could not use expression_to_pyarrow(Not(expr)) to achieve this complimentary effect because ~ in pc.Expression does not handle null. + Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null. """ categorized_refs = _get_null_nan_refs(expr) - null_unmentioned_bound_refs: set[BoundReference[Any]] = categorized_refs[0] - nan_unmentioned_bound_refs: set[BoundReference[Any]] = categorized_refs[1] + + # Convert the set of references to a sorted list so that layout of the expression to build is deterministic. + null_unmentioned_bound_refs: List[BoundReference[Any]] = sorted(categorized_refs[0], key=lambda ref: ref.field.name) + nan_unmentioned_bound_refs: List[BoundReference[Any]] = sorted(categorized_refs[1], key=lambda ref: ref.field.name) + preserve_expr: BooleanExpression = Not(expr) - print("check the order:", [f.field.name for f in null_unmentioned_bound_refs]) for term in null_unmentioned_bound_refs: preserve_expr = Or(preserve_expr, BoundIsNull(term=term)) for term in nan_unmentioned_bound_refs: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cfc8bb322b..79af476c91 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -58,7 +58,6 @@ And, BooleanExpression, EqualTo, - Not, Or, Reference, ) @@ -579,7 +578,6 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti from pyiceberg.io.pyarrow import ( _dataframe_to_data_files, _expression_to_complementary_pyarrow, - expression_to_pyarrow, project_table, ) @@ -598,10 +596,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) - print(f"{bound_delete_filter=}") - print(f"{expression_to_pyarrow(Not(bound_delete_filter))=}") preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) - print(f"{preserve_row_filter=}") files = self._scan(row_filter=delete_filter).plan_files() diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 87e630a12a..e1727f2606 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -643,7 +643,6 @@ def bound_is_null_double_field(bound_reference_double: BoundReference[Any]) -> B return BoundIsNull(bound_reference_double) -@pytest.mark.german def test_collect_null_nan_unmentioned_terms( bound_eq_str_field: BoundEqualTo[Any], bound_is_nan_float_field: BoundIsNaN[Any], bound_is_null_double_field: BoundIsNull[Any] ) -> None: @@ -660,7 +659,6 @@ def test_collect_null_nan_unmentioned_terms( assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} -@pytest.mark.german def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term( bound_eq_str_field: BoundEqualTo[Any], bound_greater_than_float_field: BoundGreaterThan[Any], @@ -686,7 +684,6 @@ def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} -@pytest.mark.china def test__expression_to_complementary_pyarrow( bound_eq_str_field: BoundEqualTo[Any], bound_greater_than_float_field: BoundGreaterThan[Any], @@ -703,8 +700,7 @@ def test__expression_to_complementary_pyarrow( Not(bound_is_null_double_field), ) result = _expression_to_complementary_pyarrow(bound_expr) - # Notice an isNan predicate on a str column is automatically converted to always false and removed from Or. - print("this is the result", repr(result)) + # Notice an isNan predicate on a str column is automatically converted to always false and removed from Or and thus will not appear in the pc.expr. assert ( repr(result) == """ 100)) or (is_nan(field_float_mentioned_nan) and (field_double_mentioned_null == 0))) or (field_float_mentioned_nan > 100)) and invert(is_null(field_double_mentioned_null, {nan_is_null=false})))) or is_null(field_float_mentioned_nan, {nan_is_null=false})) or is_null(field_str_unmentioned, {nan_is_null=false})) or is_nan(field_double_mentioned_null))>""" From 1813a18c32c4821b90b8ff1d0916159a6ef8dccf Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 20:26:26 +0000 Subject: [PATCH 07/12] refactor code organization --- pyiceberg/io/pyarrow.py | 54 +++++++++++++++---------------- tests/integration/test_deletes.py | 2 +- tests/io/test_pyarrow_visitor.py | 37 ++++++++++++++------- 3 files changed, 53 insertions(+), 40 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1685b384dc..87dfe3b7ba 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -634,7 +634,7 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p return left_result | right_result -class _CollectNullNaNUnmentionedTermsFromExpression(BoundBooleanExpressionVisitor[Any]): +class _NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[Any]): # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. is_null_or_not_bound_terms: set[BoundTerm[Any]] # The remaining BoundTerms appearing in the boolean expr. @@ -744,13 +744,25 @@ def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> None: return + def collect( + self, + expr: BooleanExpression, + ) -> None: + """Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining.""" + boolean_expression_visit(expr, self) + -def _get_null_nan_refs( - expr: BooleanExpression, -) -> tuple[Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]], Set[BoundReference[Any]]]: - """Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining.""" - collector = _CollectNullNaNUnmentionedTermsFromExpression() - boolean_expression_visit(expr, collector) +def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: + return boolean_expression_visit(expr, _ConvertToArrowExpression()) + + +def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expression: + """Complementary filter conversion function of expression_to_pyarrow. + + Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null. + """ + collector = _NullNaNUnmentionedTermsCollector() + collector.collect(expr) def _downcast_term_to_reference(bound_terms: Set[BoundTerm[Any]]) -> Set[BoundReference[Any]]: """Handle mypy check for BoundTerm -> BoundReference.""" @@ -763,32 +775,20 @@ def _downcast_term_to_reference(bound_terms: Set[BoundTerm[Any]]) -> Set[BoundRe return bound_refs null_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.null_unmentioned_bound_terms) - is_null_or_not_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.is_null_or_not_bound_terms) nan_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.nan_unmentioned_bound_terms) - is_nan_or_not_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.is_nan_or_not_bound_terms) - - return null_unmentioned_bound_refs, nan_unmentioned_bound_refs, is_null_or_not_bound_refs, is_nan_or_not_bound_refs - - -def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: - return boolean_expression_visit(expr, _ConvertToArrowExpression()) - - -def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expression: - """Complementary filter conversion function of expression_to_pyarrow. - - Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null. - """ - categorized_refs = _get_null_nan_refs(expr) # Convert the set of references to a sorted list so that layout of the expression to build is deterministic. - null_unmentioned_bound_refs: List[BoundReference[Any]] = sorted(categorized_refs[0], key=lambda ref: ref.field.name) - nan_unmentioned_bound_refs: List[BoundReference[Any]] = sorted(categorized_refs[1], key=lambda ref: ref.field.name) + null_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted( + null_unmentioned_bound_refs, key=lambda ref: ref.field.name + ) + nan_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted( + nan_unmentioned_bound_refs, key=lambda ref: ref.field.name + ) preserve_expr: BooleanExpression = Not(expr) - for term in null_unmentioned_bound_refs: + for term in null_unmentioned_bound_refs_sorted: preserve_expr = Or(preserve_expr, BoundIsNull(term=term)) - for term in nan_unmentioned_bound_refs: + for term in nan_unmentioned_bound_refs_sorted: preserve_expr = Or(preserve_expr, BoundIsNaN(term=term)) return expression_to_pyarrow(preserve_expr) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 562b790a3c..49de265ac8 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -536,7 +536,7 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: This means if we set the test case as floats == 2.0 (equal predicate as in test_delete_overwrite_table_with_null), test will pass even without the logic under test - in _CollectNullNaNUnmentionedTermsFromExpression (a helper of _expression_to_complementary_pyarrow + in _NullNaNUnmentionedTermsCollector (a helper of _expression_to_complementary_pyarrow to handle revert of iceberg expression of is_null/not_null/is_nan/not_nan). Instead, we test the filter of !=, so that the revert is == which exposes the issue. """ diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index e1727f2606..2896df5c95 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -37,8 +37,8 @@ _ConvertToIceberg, _ConvertToIcebergWithoutIDs, _expression_to_complementary_pyarrow, - _get_null_nan_refs, _HasIds, + _NullNaNUnmentionedTermsCollector, _pyarrow_schema_ensure_large_types, pyarrow_to_schema, schema_to_pyarrow, @@ -649,14 +649,20 @@ def test_collect_null_nan_unmentioned_terms( bound_expr = And( Or(And(bound_eq_str_field, bound_is_nan_float_field), bound_is_null_double_field), Not(bound_is_nan_float_field) ) - categorized_terms = _get_null_nan_refs(bound_expr) - - assert {f.field.name for f in categorized_terms[0]} == {"field_float_mentioned_nan", "field_str_unmentioned"} - assert {f.field.name for f in categorized_terms[1]} == {"field_str_unmentioned", "field_double_mentioned_null"} - assert {f.field.name for f in categorized_terms[2]} == { + collector = _NullNaNUnmentionedTermsCollector() + collector.collect(bound_expr) + assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore + "field_float_mentioned_nan", + "field_str_unmentioned", + } + assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore + "field_str_unmentioned", + "field_double_mentioned_null", + } + assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore "field_double_mentioned_null", } - assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} + assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"field_float_mentioned_nan"} # type: ignore def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term( @@ -675,13 +681,20 @@ def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same ), Not(bound_is_null_double_field), ) - categorized_terms = _get_null_nan_refs(bound_expr) - assert {f.field.name for f in categorized_terms[0]} == {"field_float_mentioned_nan", "field_str_unmentioned"} - assert {f.field.name for f in categorized_terms[1]} == {"field_str_unmentioned", "field_double_mentioned_null"} - assert {f.field.name for f in categorized_terms[2]} == { + collector = _NullNaNUnmentionedTermsCollector() + collector.collect(bound_expr) + assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore + "field_float_mentioned_nan", + "field_str_unmentioned", + } + assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore + "field_str_unmentioned", + "field_double_mentioned_null", + } + assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore "field_double_mentioned_null", } - assert {f.field.name for f in categorized_terms[3]} == {"field_float_mentioned_nan"} + assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"field_float_mentioned_nan"} # type: ignore def test__expression_to_complementary_pyarrow( From 80dbcc6fa4798f7d8983e802834e95ab5fccef9b Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 23 Jul 2024 23:06:23 +0000 Subject: [PATCH 08/12] fix mouthful naming --- tests/io/test_pyarrow_visitor.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 2896df5c95..6930bc4e93 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -599,21 +599,21 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa @pytest.fixture def bound_reference_str() -> BoundReference[Any]: return BoundReference( - field=NestedField(1, "field_str_unmentioned", StringType(), required=False), accessor=Accessor(position=0, inner=None) + field=NestedField(1, "string_field", StringType(), required=False), accessor=Accessor(position=0, inner=None) ) @pytest.fixture def bound_reference_float() -> BoundReference[Any]: return BoundReference( - field=NestedField(2, "field_float_mentioned_nan", FloatType(), required=False), accessor=Accessor(position=1, inner=None) + field=NestedField(2, "float_field", FloatType(), required=False), accessor=Accessor(position=1, inner=None) ) @pytest.fixture def bound_reference_double() -> BoundReference[Any]: return BoundReference( - field=NestedField(3, "field_double_mentioned_null", DoubleType(), required=False), + field=NestedField(3, "double_field", DoubleType(), required=False), accessor=Accessor(position=2, inner=None), ) @@ -652,17 +652,17 @@ def test_collect_null_nan_unmentioned_terms( collector = _NullNaNUnmentionedTermsCollector() collector.collect(bound_expr) assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore - "field_float_mentioned_nan", - "field_str_unmentioned", + "float_field", + "string_field", } assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore - "field_str_unmentioned", - "field_double_mentioned_null", + "string_field", + "double_field", } assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore - "field_double_mentioned_null", + "double_field", } - assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"field_float_mentioned_nan"} # type: ignore + assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"float_field"} # type: ignore def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term( @@ -684,17 +684,17 @@ def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same collector = _NullNaNUnmentionedTermsCollector() collector.collect(bound_expr) assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore - "field_float_mentioned_nan", - "field_str_unmentioned", + "float_field", + "string_field", } assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore - "field_str_unmentioned", - "field_double_mentioned_null", + "string_field", + "double_field", } assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore - "field_double_mentioned_null", + "double_field", } - assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"field_float_mentioned_nan"} # type: ignore + assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"float_field"} # type: ignore def test__expression_to_complementary_pyarrow( @@ -716,5 +716,5 @@ def test__expression_to_complementary_pyarrow( # Notice an isNan predicate on a str column is automatically converted to always false and removed from Or and thus will not appear in the pc.expr. assert ( repr(result) - == """ 100)) or (is_nan(field_float_mentioned_nan) and (field_double_mentioned_null == 0))) or (field_float_mentioned_nan > 100)) and invert(is_null(field_double_mentioned_null, {nan_is_null=false})))) or is_null(field_float_mentioned_nan, {nan_is_null=false})) or is_null(field_str_unmentioned, {nan_is_null=false})) or is_nan(field_double_mentioned_null))>""" + == """ 100)) or (is_nan(float_field) and (double_field == 0))) or (float_field > 100)) and invert(is_null(double_field, {nan_is_null=false})))) or is_null(float_field, {nan_is_null=false})) or is_null(string_field, {nan_is_null=false})) or is_nan(double_field))>""" ) From b8aa4d888aec8ef88941fd48651fc9835ce73248 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 24 Jul 2024 14:28:44 +0000 Subject: [PATCH 09/12] restore usage of BoundTerm --- pyiceberg/io/pyarrow.py | 29 ++++++++--------------------- tests/io/test_pyarrow_visitor.py | 18 +++++++++--------- 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 87dfe3b7ba..a3032b443c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -73,7 +73,7 @@ from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ResolveError -from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundReference, BoundTerm, Not, Or +from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or from pyiceberg.expressions.literals import Literal from pyiceberg.expressions.visitors import ( BoundBooleanExpressionVisitor, @@ -764,31 +764,18 @@ def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expressi collector = _NullNaNUnmentionedTermsCollector() collector.collect(expr) - def _downcast_term_to_reference(bound_terms: Set[BoundTerm[Any]]) -> Set[BoundReference[Any]]: - """Handle mypy check for BoundTerm -> BoundReference.""" - bound_refs: Set[BoundReference[Any]] = set() - for t in bound_terms: - if not isinstance(t, BoundReference): - raise ValueError("Collected Bound Term that is not reference.") - else: - bound_refs.add(t) - return bound_refs - - null_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.null_unmentioned_bound_terms) - nan_unmentioned_bound_refs: Set[BoundReference[Any]] = _downcast_term_to_reference(collector.nan_unmentioned_bound_terms) - - # Convert the set of references to a sorted list so that layout of the expression to build is deterministic. - null_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted( - null_unmentioned_bound_refs, key=lambda ref: ref.field.name + # Convert the set of terms to a sorted list so that layout of the expression to build is deterministic. + null_unmentioned_bound_terms: List[BoundTerm[Any]] = sorted( + collector.null_unmentioned_bound_terms, key=lambda term: term.ref().field.name ) - nan_unmentioned_bound_refs_sorted: List[BoundReference[Any]] = sorted( - nan_unmentioned_bound_refs, key=lambda ref: ref.field.name + nan_unmentioned_bound_terms: List[BoundTerm[Any]] = sorted( + collector.nan_unmentioned_bound_terms, key=lambda term: term.ref().field.name ) preserve_expr: BooleanExpression = Not(expr) - for term in null_unmentioned_bound_refs_sorted: + for term in null_unmentioned_bound_terms: preserve_expr = Or(preserve_expr, BoundIsNull(term=term)) - for term in nan_unmentioned_bound_refs_sorted: + for term in nan_unmentioned_bound_terms: preserve_expr = Or(preserve_expr, BoundIsNaN(term=term)) return expression_to_pyarrow(preserve_expr) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 6930bc4e93..f0a2a45816 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -651,18 +651,18 @@ def test_collect_null_nan_unmentioned_terms( ) collector = _NullNaNUnmentionedTermsCollector() collector.collect(bound_expr) - assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.null_unmentioned_bound_terms} == { "float_field", "string_field", } - assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.nan_unmentioned_bound_terms} == { "string_field", "double_field", } - assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.is_null_or_not_bound_terms} == { "double_field", } - assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"float_field"} # type: ignore + assert {t.ref().field.name for t in collector.is_nan_or_not_bound_terms} == {"float_field"} def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term( @@ -683,21 +683,21 @@ def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same ) collector = _NullNaNUnmentionedTermsCollector() collector.collect(bound_expr) - assert {f.field.name for f in collector.null_unmentioned_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.null_unmentioned_bound_terms} == { "float_field", "string_field", } - assert {f.field.name for f in collector.nan_unmentioned_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.nan_unmentioned_bound_terms} == { "string_field", "double_field", } - assert {f.field.name for f in collector.is_null_or_not_bound_terms} == { # type: ignore + assert {t.ref().field.name for t in collector.is_null_or_not_bound_terms} == { "double_field", } - assert {f.field.name for f in collector.is_nan_or_not_bound_terms} == {"float_field"} # type: ignore + assert {t.ref().field.name for t in collector.is_nan_or_not_bound_terms} == {"float_field"} -def test__expression_to_complementary_pyarrow( +def test_expression_to_complementary_pyarrow( bound_eq_str_field: BoundEqualTo[Any], bound_greater_than_float_field: BoundGreaterThan[Any], bound_is_nan_float_field: BoundIsNaN[Any], From 007f2162f61f1731025d24cf39a7cfc887caa7f5 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 24 Jul 2024 19:30:13 +0000 Subject: [PATCH 10/12] small fixes for comments --- pyiceberg/io/pyarrow.py | 10 +++++----- tests/integration/test_deletes.py | 3 --- tests/io/test_pyarrow.py | 1 + 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a3032b443c..4fa663ae86 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -634,7 +634,7 @@ def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> p return left_result | right_result -class _NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[Any]): +class _NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[None]): # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr. is_null_or_not_bound_terms: set[BoundTerm[Any]] # The remaining BoundTerms appearing in the boolean expr. @@ -645,11 +645,11 @@ class _NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[Any]): nan_unmentioned_bound_terms: set[BoundTerm[Any]] def __init__(self) -> None: + super().__init__() self.is_null_or_not_bound_terms = set() self.null_unmentioned_bound_terms = set() self.is_nan_or_not_bound_terms = set() self.nan_unmentioned_bound_terms = set() - super().__init__() def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None: """Handle the predicate case where either is_null or is_not_null is included.""" @@ -735,13 +735,13 @@ def visit_true(self) -> None: def visit_false(self) -> None: return - def visit_not(self, child_result: pc.Expression) -> None: + def visit_not(self, child_result: None) -> None: return - def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> None: + def visit_and(self, left_result: None, right_result: None) -> None: return - def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> None: + def visit_or(self, left_result: None, right_result: None) -> None: return def collect( diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 49de265ac8..c474de296c 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -105,7 +105,6 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [30, 30]} -@pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: identifier = "default.table_partitioned_delete" @@ -454,7 +453,6 @@ def test_delete_truncate(session_catalog: RestCatalog) -> None: assert entries[0].status == ManifestEntryStatus.DELETED -@pytest.mark.integration def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None: arrow_schema = pa.schema([pa.field("ints", pa.int32())]) arrow_tbl = pa.Table.from_pylist( @@ -494,7 +492,6 @@ def test_delete_overwrite_table_with_null(session_catalog: RestCatalog) -> None: assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1, None] -@pytest.mark.integration def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None: arrow_schema = pa.schema([pa.field("floats", pa.float32())]) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 60c6e4b67a..82b35341b9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name + import os import tempfile import uuid From 6a9e58a2a2b4b4b5d702a1e471f488d055ee2169 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 24 Jul 2024 20:48:06 +0000 Subject: [PATCH 11/12] small fix for typing --- pyiceberg/io/pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4fa663ae86..4a53a367fe 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -131,7 +131,7 @@ from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform -from pyiceberg.typedef import EMPTY_DICT, Properties, Record +from pyiceberg.typedef import EMPTY_DICT, L, Properties, Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -572,11 +572,11 @@ def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar: class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]): - def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression: + def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> pc.Expression: pyarrow_literals = pa.array(literals, type=schema_to_pyarrow(term.ref().field.field_type)) return pc.field(term.ref().field.name).isin(pyarrow_literals) - def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression: + def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> pc.Expression: pyarrow_literals = pa.array(literals, type=schema_to_pyarrow(term.ref().field.field_type)) return ~pc.field(term.ref().field.name).isin(pyarrow_literals) @@ -673,11 +673,11 @@ def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None: if term not in self.is_nan_or_not_bound_terms: self.nan_unmentioned_bound_terms.add(term) - def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: + def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> None: self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) - def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> None: + def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> None: self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) From ec746477f0803de5eba39b01a3f361a3ec40efd3 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Wed, 24 Jul 2024 23:11:32 +0000 Subject: [PATCH 12/12] fix typing according to pr comment --- pyiceberg/io/pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4a53a367fe..f3b85eb499 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -131,7 +131,7 @@ from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform -from pyiceberg.typedef import EMPTY_DICT, L, Properties, Record +from pyiceberg.typedef import EMPTY_DICT, Properties, Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -572,11 +572,11 @@ def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar: class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]): - def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> pc.Expression: + def visit_in(self, term: BoundTerm[Any], literals: Set[Any]) -> pc.Expression: pyarrow_literals = pa.array(literals, type=schema_to_pyarrow(term.ref().field.field_type)) return pc.field(term.ref().field.name).isin(pyarrow_literals) - def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> pc.Expression: + def visit_not_in(self, term: BoundTerm[Any], literals: Set[Any]) -> pc.Expression: pyarrow_literals = pa.array(literals, type=schema_to_pyarrow(term.ref().field.field_type)) return ~pc.field(term.ref().field.name).isin(pyarrow_literals) @@ -673,11 +673,11 @@ def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None: if term not in self.is_nan_or_not_bound_terms: self.nan_unmentioned_bound_terms.add(term) - def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> None: + def visit_in(self, term: BoundTerm[Any], literals: Set[Any]) -> None: self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term) - def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> None: + def visit_not_in(self, term: BoundTerm[Any], literals: Set[Any]) -> None: self._handle_null_unmentioned(term) self._handle_nan_unmentioned(term)