diff --git a/Makefile b/Makefile index 35051be9c1..697a1a2fbb 100644 --- a/Makefile +++ b/Makefile @@ -40,10 +40,10 @@ test-integration: docker compose -f dev/docker-compose-integration.yml kill docker compose -f dev/docker-compose-integration.yml rm -f docker compose -f dev/docker-compose-integration.yml up -d - sleep 10 + sleep 5 docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker compose -f dev/docker-compose-integration.yml kill diff --git a/pyiceberg/expressions/literals.py b/pyiceberg/expressions/literals.py index d9f66ae24a..d1c170d0dd 100644 --- a/pyiceberg/expressions/literals.py +++ b/pyiceberg/expressions/literals.py @@ -311,6 +311,10 @@ def _(self, _: TimeType) -> Literal[int]: def _(self, _: TimestampType) -> Literal[int]: return TimestampLiteral(self.value) + @to.register(TimestamptzType) + def _(self, _: TimestamptzType) -> Literal[int]: + return TimestampLiteral(self.value) + @to.register(DecimalType) def _(self, type_var: DecimalType) -> Literal[Decimal]: unscaled = Decimal(self.value) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e0214d1bde..31780d25cd 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -55,6 +55,7 @@ And, BooleanExpression, EqualTo, + IsNull, Not, Or, Reference, @@ -439,6 +440,72 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: update_snapshot.append_data_file(data_file) + def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in delete_partitions: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos in range(len(partition_fields)): + predicate = ( + EqualTo(Reference(partition_fields[pos]), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_fields[pos])) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + _check_schema_compatible(self._table.schema(), other_schema=df.schema) + + # cast if the two schemas are compatible but not equal + table_arrow_schema = self._table.schema().as_arrow() + if table_arrow_schema != df.schema: + df = df.cast(table_arrow_schema) + + # If dataframe does not have data, there is no need to overwrite + if df.shape[0] == 0: + return + + append_snapshot_commit_uuid = uuid.uuid4() + data_files: List[DataFile] = list( + _dataframe_to_data_files( + table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io + ) + ) + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_partitions = [data_file.partition for data_file in data_files] + delete_filter = self._build_partition_predicate( + spec_id=self.table_metadata.spec().spec_id, delete_partitions=delete_partitions + ) + delete_snapshot.delete_by_predicate(delete_filter) + + with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append( + append_snapshot_commit_uuid + ) as append_snapshot: + for data_file in data_files: + append_snapshot.append_data_file(data_file) + def overwrite( self, df: pa.Table, @@ -1436,6 +1503,17 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties) + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """Shorthand for dynamic overwriting the table with a PyArrow table. + + Old partitions are auto detected and replaced with data files created for input arrow table. + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + with self.transaction() as tx: + tx.dynamic_overwrite(df=df, snapshot_properties=snapshot_properties) + def overwrite( self, df: pa.Table, @@ -3265,9 +3343,13 @@ def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Di self._io = io self._snapshot_properties = snapshot_properties - def fast_append(self) -> FastAppendFiles: + def fast_append(self, commit_uuid: Optional[uuid.UUID] = None) -> FastAppendFiles: return FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + commit_uuid=commit_uuid, ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index fb49a50563..abbbeeef9d 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -16,6 +16,7 @@ # under the License. # pylint:disable=redefined-outer-name + import pyarrow as pa import pytest from pyspark.sql import SparkSession @@ -176,6 +177,73 @@ def test_query_filter_appended_null_partitioned( assert len(rows) == 6 +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", + [ + "int", + "bool", + "string", + "string_long", + "long", + "float", + "double", + "date", + "timestamp", + "binary", + "timestamptz", + ], +) +@pytest.mark.parametrize( + "format_version", + [1, 2], +) +def test_query_filter_dynamic_overwrite_null_partitioned( + session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int +) -> None: + # Given + identifier = f"default.arrow_table_v{format_version}_appended_with_null_partitioned_on_col_{part_col}" + nested_field = TABLE_SCHEMA.find_field(part_col) + partition_spec = PartitionSpec( + PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col) + ) + + # When + tbl = _create_table( + session_catalog=session_catalog, + identifier=identifier, + properties={"format-version": str(format_version)}, + data=[], + partition_spec=partition_spec, + ) + # Append with arrow_table_1 with lines [A,B,C] and then arrow_table_2 with lines[A,B,C,A,B,C] + tbl.append(arrow_table_with_null) + tbl.append(pa.concat_tables([arrow_table_with_null, arrow_table_with_null])) + # Then + assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" + df = spark.table(identifier) + for col in arrow_table_with_null.column_names: + df = spark.table(identifier) + assert df.where(f"{col} is not null").count() == 6, f"Expected 6 non-null rows for {col}" + assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows for {col}" + # expecting 6 files: first append with [A], [B], [C], second append with [A, A], [B, B], [C, C] + rows = spark.sql(f"select partition from {identifier}.files").collect() + assert len(rows) == 6 + + tbl.dynamic_overwrite(arrow_table_with_null) + # tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 2)) + # Then + assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" + df = spark.table(identifier) + for col in arrow_table_with_null.column_names: + df = spark.table(identifier) + assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}," + assert df.where(f"{col} is null").count() == 1, f"Expected 1 null rows for {col}," + # expecting 3 files: + rows = spark.sql(f"select partition from {identifier}.files").collect() + assert len(rows) == 3 + + @pytest.mark.integration @pytest.mark.parametrize( "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamptz", "timestamp", "binary"] @@ -234,50 +302,149 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro tbl.append(arrow_table_with_null) tbl.append(arrow_table_with_null) + tbl.dynamic_overwrite(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 2)) rows = spark.sql( f""" - SELECT operation, summary + SELECT * FROM {identifier}.snapshots ORDER BY committed_at ASC """ ).collect() operations = [row.operation for row in rows] - assert operations == ["append", "append"] - + assert operations == ["append", "append", "delete", "append", "append", "delete", "append"] summaries = [row.summary for row in rows] - assert summaries[0] == { - "changed-partition-count": "3", - "added-data-files": "3", - "added-files-size": "15029", - "added-records": "3", - "total-data-files": "3", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "15029", - "total-position-deletes": "0", - "total-records": "3", - } - - assert summaries[1] == { - "changed-partition-count": "3", - "added-data-files": "3", - "added-files-size": "15029", - "added-records": "3", - "total-data-files": "6", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "30058", - "total-position-deletes": "0", - "total-records": "6", - } + assert summaries == [ + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": "15029", + "total-delete-files": "0", + "total-files-size": "15029", + "total-data-files": "3", + "total-records": "3", + }, + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": "15029", + "total-delete-files": "0", + "total-files-size": "30058", + "total-data-files": "6", + "total-records": "6", + }, + { + "removed-files-size": "30058", + "changed-partition-count": "3", + "total-equality-deletes": "0", + "deleted-data-files": "6", + "total-position-deletes": "0", + "total-delete-files": "0", + "deleted-records": "6", + "total-files-size": "0", + "total-data-files": "0", + "total-records": "0", + }, + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": "15029", + "total-delete-files": "0", + "total-files-size": "15029", + "total-data-files": "3", + "total-records": "3", + }, + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": "15029", + "total-delete-files": "0", + "total-files-size": "30058", + "total-data-files": "6", + "total-records": "6", + }, + { + "removed-files-size": "19268", + "changed-partition-count": "2", + "total-equality-deletes": "0", + "deleted-data-files": "4", + "total-position-deletes": "0", + "total-delete-files": "0", + "deleted-records": "4", + "total-files-size": "10790", + "total-data-files": "2", + "total-records": "2", + }, + { + "changed-partition-count": "2", + "added-data-files": "2", + "total-equality-deletes": "0", + "added-records": "2", + "total-position-deletes": "0", + "added-files-size": "9634", + "total-delete-files": "0", + "total-files-size": "20424", + "total-data-files": "4", + "total-records": "4", + }, + ] @pytest.mark.integration def test_data_files_with_table_partitioned_with_null( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table ) -> None: + # Append : First append has manifestlist file linking to 1 manifest file. + # ML1 = [M1] + # + # Append : Second append's manifestlist links to 2 manifest files. + # ML2 = [M1, M2] + # + # Dynamic Overwrite: Dynamic overwrite on all partitions of the table delete all data and append new data + # it has 2 snapshots of delete and append and thus 2 snapshots + # the first snapshot generates M3 with 6 delete data entries collected from M1 and M2. + # ML3 = [M3] + # + # The second snapshot generates M4 with 3 appended data entries and since M3 (previous manifests) only has delte entries it does not lint to it. + # ML4 = [M4] + + # Append : Append generates M5 with new data entries and links to all previous manifests which is M4 . + # ML5 = [M5, M4] + + # Dynamic Overwrite: Dynamic overwrite on partial partitions of the table delete partial data and append new data + # it has 2 snapshots of delete and append and thus 2 snapshots + # the first snapshot generates M6 with 4 delete data entries collected from M1 and M2, + # then it generates M7 as remaining existing entries from M1 and M8 from M2 + # ML6 = [M6, M7, M8] + # + # The second snapshot generates M9 with 3 appended data entries and it also looks at manifests in ML6 (previous manifests) + # it ignores M6 since it only has delte entries but it links to M7 and M8. + # ML7 = [M9, M7, M8] + + # tldr: + # APPEND ML1 = [M1] + # APPEND ML2 = [M1, M2] + # DYNAMIC_OVERWRITE ML3 = [M3] + # ML4 = [M4] + # APPEND ML5 = [M5, M4] + # DYNAMIC_OVERWRITE ML6 = [M6, M7, M8] + # ML7 = [M9, M7, M8] + identifier = "default.arrow_data_files" try: @@ -287,28 +454,28 @@ def test_data_files_with_table_partitioned_with_null( tbl = session_catalog.create_table( identifier=identifier, schema=TABLE_SCHEMA, - partition_spec=PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int")), + partition_spec=PartitionSpec( + PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="bool"), + PartitionField(source_id=4, field_id=1002, transform=IdentityTransform(), name="int"), + ), properties={"format-version": "1"}, ) tbl.append(arrow_table_with_null) tbl.append(arrow_table_with_null) - - # added_data_files_count, existing_data_files_count, deleted_data_files_count + tbl.dynamic_overwrite(arrow_table_with_null) + tbl.append(arrow_table_with_null) + tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 2)) rows = spark.sql( f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + SELECT * FROM {identifier}.all_manifests """ ).collect() - assert [row.added_data_files_count for row in rows] == [3, 3, 3] - assert [row.existing_data_files_count for row in rows] == [ - 0, - 0, - 0, - ] - assert [row.deleted_data_files_count for row in rows] == [0, 0, 0] + assert [row.added_data_files_count for row in rows] == [3, 3, 3, 0, 3, 3, 3, 0, 0, 0, 2, 0, 0] + assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1] + assert [row.deleted_data_files_count for row in rows] == [0, 0, 0, 6, 0, 0, 0, 4, 0, 0, 0, 0, 0] @pytest.mark.integration